| #!/usr/bin/python |
| |
| import os |
| import sys |
| import random |
| import subprocess as sub |
| |
| def identity(x): |
| return x |
| |
| def cygpath(x): |
| command = ["cygpath", "-wp", x] |
| p = sub.Popen(command,stdout=sub.PIPE) |
| output, errors = p.communicate() |
| lines = output.split("\n") |
| return lines[0] |
| |
| if sys.platform == "cygwin": |
| normclasspath = cygpath |
| else: |
| normclasspath = identity |
| |
| CONF_DIR = os.path.expanduser("~/.storm") |
| STORM_DIR = "/".join(os.path.abspath( __file__ ).split("/")[:-2]) |
| |
| if not os.path.exists(STORM_DIR + "/RELEASE"): |
| print "******************************************" |
| print "The storm client can only be run from within a release. You appear to be trying to run the client from a checkout of Storm's source code." |
| print "\nYou can download a Storm release at https://github.com/nathanmarz/storm/downloads" |
| print "******************************************" |
| sys.exit(1) |
| |
| def get_jars_full(adir): |
| files = os.listdir(adir) |
| ret = [] |
| for f in files: |
| if f.endswith(".jar"): |
| ret.append(adir + "/" + f) |
| return ret |
| |
| def get_classpath(extrajars): |
| ret = get_jars_full(STORM_DIR) |
| ret.extend(get_jars_full(STORM_DIR + "/lib")) |
| ret.extend(extrajars) |
| return normclasspath(":".join(ret)) |
| |
| def confvalue(name, extrapaths): |
| command = [ |
| "java", "-client", "-cp", get_classpath(extrapaths), |
| "backtype.storm.command.config_value", name |
| ] |
| p = sub.Popen(command, stdout=sub.PIPE) |
| output, errors = p.communicate() |
| lines = output.split("\n") |
| for line in lines: |
| tokens = line.split(" ") |
| if tokens[0] == "VALUE:": |
| return " ".join(tokens[1:]) |
| |
| def print_localconfvalue(name): |
| """Syntax: [storm localconfvalue conf-name] |
| |
| Prints out the value for conf-name in the local Storm configs. |
| The local Storm configs are the ones in ~/.storm/storm.yaml merged |
| in with the configs in defaults.yaml. |
| """ |
| print name + ": " + confvalue(name, [CONF_DIR]) |
| |
| def print_remoteconfvalue(name): |
| """Syntax: [storm remoteconfvalue conf-name] |
| |
| Prints out the value for conf-name in the cluster's Storm configs. |
| The cluster's Storm configs are the ones in $STORM-PATH/conf/storm.yaml |
| merged in with the configs in defaults.yaml. |
| |
| This command must be run on a cluster machine. |
| """ |
| print name + ": " + confvalue(name, [STORM_DIR + "/conf"]) |
| |
| def exec_storm_class(klass, jvmtype="-server", childopts="", extrajars=[], args=[]): |
| nativepath = confvalue("java.library.path", extrajars) |
| command = "java " + jvmtype + " -Dstorm.home=" + STORM_DIR + " -Djava.library.path=" + nativepath + " " + childopts + " -cp " + get_classpath(extrajars) + " " + klass + " " + " ".join(args) |
| print "Running: " + command |
| os.system(command) |
| |
| def jar(jarfile, klass, *args): |
| """Syntax: [storm jar topology-jar-path class ...] |
| |
| Runs the main method of class with the specified arguments. |
| The storm jars and configs in ~/.storm are put on the classpath. |
| The process is configured so that StormSubmitter |
| (http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html) |
| will upload the jar at topology-jar-path when the topology is submitted. |
| """ |
| exec_storm_class( |
| klass, |
| jvmtype="-client", |
| extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"], |
| args=args, |
| childopts="-Dstorm.jar=" + jarfile) |
| |
| def kill(*args): |
| """Syntax: [storm kill topology-name [-w wait-time-secs]] |
| |
| Kills the topology with the name topology-name. Storm will |
| first deactivate the topology's spouts for the duration of |
| the topology's message timeout to allow all messages currently |
| being processed to finish processing. Storm will then shutdown |
| the workers and clean up their state. You can override the length |
| of time Storm waits between deactivation and shutdown with the -w flag. |
| """ |
| exec_storm_class( |
| "backtype.storm.command.kill_topology", |
| args=args, |
| jvmtype="-client", |
| extrajars=[CONF_DIR, STORM_DIR + "/bin"]) |
| |
| def activate(*args): |
| """Syntax: [storm activate topology-name] |
| |
| Activates the specified topology's spouts. |
| """ |
| exec_storm_class( |
| "backtype.storm.command.activate", |
| args=args, |
| jvmtype="-client", |
| extrajars=[CONF_DIR, STORM_DIR + "/bin"]) |
| |
| def deactivate(*args): |
| """Syntax: [storm deactivate topology-name] |
| |
| Deactivates the specified topology's spouts. |
| """ |
| exec_storm_class( |
| "backtype.storm.command.deactivate", |
| args=args, |
| jvmtype="-client", |
| extrajars=[CONF_DIR, STORM_DIR + "/bin"]) |
| |
| def rebalance(*args): |
| """Syntax: [storm rebalance topology-name [-w wait-time-secs]] |
| |
| Sometimes you may wish to spread out where the workers for a topology |
| are running. For example, let's say you have a 10 node cluster running |
| 4 workers per node, and then let's say you add another 10 nodes to |
| the cluster. You may wish to have Storm spread out the workers for the |
| running topology so that each node runs 2 workers. One way to do this |
| is to kill the topology and resubmit it, but Storm provides a "rebalance" |
| command that provides an easier way to do this. |
| |
| Rebalance will first deactivate the topology for the duration of the |
| message timeout (overridable with the -w flag) and then redistribute |
| the workers evenly around the cluster. The topology will then return to |
| its previous state of activation (so a deactivated topology will still |
| be deactivated and an activated topology will go back to being activated). |
| """ |
| exec_storm_class( |
| "backtype.storm.command.rebalance", |
| args=args, |
| jvmtype="-client", |
| extrajars=[CONF_DIR, STORM_DIR + "/bin"]) |
| |
| def shell(resourcesdir, command, *args): |
| tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar" |
| os.system("jar cf %s %s" % (tmpjarpath, resourcesdir)) |
| runnerargs = [tmpjarpath, command] |
| runnerargs.extend(args) |
| exec_storm_class( |
| "backtype.storm.command.shell_submission", |
| args=runnerargs, |
| jvmtype="-client", |
| extrajars=[CONF_DIR]) |
| os.system("rm " + tmpjarpath) |
| |
| def repl(): |
| """Syntax: [storm repl] |
| |
| Opens up a Clojure REPL with the storm jars and configuration |
| on the classpath. Useful for debugging. |
| """ |
| cppaths = [STORM_DIR + "/conf"] |
| exec_storm_class("clojure.lang.Repl", jvmtype="-client", extrajars=cppaths) |
| |
| def nimbus(): |
| """Syntax: [storm nimbus] |
| |
| Launches the nimbus daemon. This command should be run under |
| supervision with a tool like daemontools or monit. |
| |
| See Setting up a Storm cluster for more information. |
| (https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster) |
| """ |
| cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"] |
| childopts = confvalue("nimbus.childopts", cppaths) + " -Dlogfile.name=nimbus.log -Dlog4j.configuration=storm.log.properties" |
| exec_storm_class( |
| "backtype.storm.daemon.nimbus", |
| jvmtype="-server", |
| extrajars=cppaths, |
| childopts=childopts) |
| |
| def supervisor(): |
| """Syntax: [storm supervisor] |
| |
| Launches the supervisor daemon. This command should be run |
| under supervision with a tool like daemontools or monit. |
| |
| See Setting up a Storm cluster for more information. |
| (https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster) |
| """ |
| cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"] |
| childopts = confvalue("supervisor.childopts", cppaths) + " -Dlogfile.name=supervisor.log -Dlog4j.configuration=storm.log.properties" |
| exec_storm_class( |
| "backtype.storm.daemon.supervisor", |
| jvmtype="-server", |
| extrajars=cppaths, |
| childopts=childopts) |
| |
| def ui(): |
| """Syntax: [storm ui] |
| |
| Launches the UI daemon. The UI provides a web interface for a Storm |
| cluster and shows detailed stats about running topologies. This command |
| should be run under supervision with a tool like daemontools or monit. |
| |
| See Setting up a Storm cluster for more information. |
| (https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster) |
| """ |
| cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"] |
| childopts = confvalue("ui.childopts", cppaths) + " -Dlogfile.name=ui.log -Dlog4j.configuration=storm.log.properties" |
| exec_storm_class( |
| "backtype.storm.ui.core", |
| jvmtype="-server", |
| childopts=childopts, |
| extrajars=[STORM_DIR + "/log4j", STORM_DIR, STORM_DIR + "/conf"]) |
| |
| def drpc(): |
| """Syntax: [storm drpc] |
| |
| Launches a DRPC daemon. This command should be run under supervision |
| with a tool like daemontools or monit. |
| |
| See Distributed RPC for more information. |
| (https://github.com/nathanmarz/storm/wiki/Distributed-RPC) |
| """ |
| childopts = "-Xmx768m -Dlogfile.name=drpc.log -Dlog4j.configuration=storm.log.properties" |
| exec_storm_class( |
| "backtype.storm.daemon.drpc", |
| jvmtype="-server", |
| childopts=childopts, |
| extrajars=[STORM_DIR + "/log4j", STORM_DIR + "/conf"]) |
| |
| def print_classpath(): |
| """Syntax: [storm classpath] |
| |
| Prints the classpath used by the storm client when running commands. |
| """ |
| print get_classpath([]) |
| |
| def print_commands(): |
| """Print all client commands and link to documentation""" |
| print "Commands:\n\t", "\n\t".join(sorted(COMMANDS.keys())) |
| print "\nHelp:", "\n\thelp", "\n\thelp <command>" |
| print "\nDocumentation for the storm client can be found at https://github.com/nathanmarz/storm/wiki/Command-line-client\n" |
| |
| def print_usage(command=None): |
| """Print one help message or list of available commands""" |
| if command != None: |
| if COMMANDS.has_key(command): |
| print (COMMANDS[command].__doc__ or |
| "No documentation provided for <%s>" % command) |
| else: |
| print "<%s> is not a valid command" % command |
| else: |
| print_commands() |
| |
| def unknown_command(*args): |
| print "Unknown command: [storm %s]" % ' '.join(sys.argv[1:]) |
| print_usage() |
| |
| COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui, |
| "drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue, |
| "remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath, |
| "activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage} |
| |
| def main(): |
| if len(sys.argv) <= 1: |
| print_usage() |
| sys.exit(-1) |
| |
| COMMAND = sys.argv[1] |
| ARGS = sys.argv[2:] |
| (COMMANDS.get(COMMAND, unknown_command))(*ARGS) |
| |
| if __name__ == "__main__": |
| main() |