| #!/usr/bin/python |
| |
| import os |
| import sys |
| import random |
| import subprocess as sub |
| import getopt |
| import re |
| |
| def identity(x): |
| return x |
| |
| def cygpath(x): |
| command = ["cygpath", "-wp", x] |
| p = sub.Popen(command,stdout=sub.PIPE) |
| output, errors = p.communicate() |
| lines = output.split("\n") |
| return lines[0] |
| |
| if sys.platform == "cygwin": |
| normclasspath = cygpath |
| else: |
| normclasspath = identity |
| |
| CONF_DIR = os.path.expanduser("~/.storm") |
| STORM_DIR = "/".join(os.path.realpath( __file__ ).split("/")[:-2]) |
| CONFIG_OPTS = [] |
| CONFFILE = "" |
| |
| def get_config_opts(): |
| global CONFIG_OPTS |
| return "-Dstorm.options=" + (','.join(CONFIG_OPTS)).replace(' ', "%%%%") |
| |
| if not os.path.exists(STORM_DIR + "/RELEASE"): |
| print "******************************************" |
| print "The storm client can only be run from within a release. You appear to be trying to run the client from a checkout of Storm's source code." |
| print "\nYou can download a Storm release at https://github.com/nathanmarz/storm/downloads" |
| print "******************************************" |
| sys.exit(1) |
| |
| def get_jars_full(adir): |
| files = os.listdir(adir) |
| ret = [] |
| for f in files: |
| if f.endswith(".jar"): |
| ret.append(adir + "/" + f) |
| return ret |
| |
| def get_classpath(extrajars): |
| ret = get_jars_full(STORM_DIR) |
| ret.extend(get_jars_full(STORM_DIR + "/lib")) |
| ret.extend(extrajars) |
| return normclasspath(":".join(ret)) |
| |
| def confvalue(name, extrapaths): |
| global CONFFILE |
| command = [ |
| "java", "-client", get_config_opts(), "-Dstorm.conf.file=" + CONFFILE, "-cp", get_classpath(extrapaths), "backtype.storm.command.config_value", name |
| ] |
| p = sub.Popen(command, stdout=sub.PIPE) |
| output, errors = p.communicate() |
| lines = output.split("\n") |
| for line in lines: |
| tokens = line.split(" ") |
| if tokens[0] == "VALUE:": |
| return " ".join(tokens[1:]) |
| return "" |
| |
| def print_localconfvalue(name): |
| """Syntax: [storm localconfvalue conf-name] |
| |
| Prints out the value for conf-name in the local Storm configs. |
| The local Storm configs are the ones in ~/.storm/storm.yaml merged |
| in with the configs in defaults.yaml. |
| """ |
| print name + ": " + confvalue(name, [CONF_DIR]) |
| |
| def print_remoteconfvalue(name): |
| """Syntax: [storm remoteconfvalue conf-name] |
| |
| Prints out the value for conf-name in the cluster's Storm configs. |
| The cluster's Storm configs are the ones in $STORM-PATH/conf/storm.yaml |
| merged in with the configs in defaults.yaml. |
| |
| This command must be run on a cluster machine. |
| """ |
| print name + ": " + confvalue(name, [STORM_DIR + "/conf"]) |
| |
| def parse_args(string): |
| r"""Takes a string of whitespace-separated tokens and parses it into a list. |
| Whitespace inside tokens may be quoted with single quotes, double quotes or |
| backslash (similar to command-line arguments in bash). |
| |
| >>> parse_args(r'''"a a" 'b b' c\ c "d'd" 'e"e' 'f\'f' "g\"g" "i""i" 'j''j' k" "k l' l' mm n\\n''') |
| ['a a', 'b b', 'c c', "d'd", 'e"e', "f'f", 'g"g', 'ii', 'jj', 'k k', 'l l', 'mm', r'n\n'] |
| """ |
| re_split = re.compile(r'''((?: |
| [^\s"'\\] | |
| "(?: [^"\\] | \\.)*" | |
| '(?: [^'\\] | \\.)*' | |
| \\. |
| )+)''', re.VERBOSE) |
| args = re_split.split(string)[1::2] |
| args = [re.compile(r'"((?:[^"\\]|\\.)*)"').sub('\\1', x) for x in args] |
| args = [re.compile(r"'((?:[^'\\]|\\.)*)'").sub('\\1', x) for x in args] |
| return [re.compile(r'\\(.)').sub('\\1', x) for x in args] |
| |
| def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False): |
| global CONFFILE |
| all_args = [ |
| "java", jvmtype, get_config_opts(), |
| "-Dstorm.home=" + STORM_DIR, |
| "-Djava.library.path=" + confvalue("java.library.path", extrajars), |
| "-Dstorm.conf.file=" + CONFFILE, |
| "-cp", get_classpath(extrajars), |
| ] + jvmopts + [klass] + list(args) |
| print "Running: " + " ".join(all_args) |
| if fork: |
| os.spawnvp(os.P_WAIT, "java", all_args) |
| else: |
| os.execvp("java", all_args) # replaces the current process and never returns |
| |
| def jar(jarfile, klass, *args): |
| """Syntax: [storm jar topology-jar-path class ...] |
| |
| Runs the main method of class with the specified arguments. |
| The storm jars and configs in ~/.storm are put on the classpath. |
| The process is configured so that StormSubmitter |
| (http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html) |
| will upload the jar at topology-jar-path when the topology is submitted. |
| """ |
| exec_storm_class( |
| klass, |
| jvmtype="-client", |
| extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"], |
| args=args, |
| jvmopts=["-Dstorm.jar=" + jarfile]) |
| |
| def kill(*args): |
| """Syntax: [storm kill topology-name [-w wait-time-secs]] |
| |
| Kills the topology with the name topology-name. Storm will |
| first deactivate the topology's spouts for the duration of |
| the topology's message timeout to allow all messages currently |
| being processed to finish processing. Storm will then shutdown |
| the workers and clean up their state. You can override the length |
| of time Storm waits between deactivation and shutdown with the -w flag. |
| """ |
| exec_storm_class( |
| "backtype.storm.command.kill_topology", |
| args=args, |
| jvmtype="-client", |
| extrajars=[CONF_DIR, STORM_DIR + "/bin"]) |
| |
| def activate(*args): |
| """Syntax: [storm activate topology-name] |
| |
| Activates the specified topology's spouts. |
| """ |
| exec_storm_class( |
| "backtype.storm.command.activate", |
| args=args, |
| jvmtype="-client", |
| extrajars=[CONF_DIR, STORM_DIR + "/bin"]) |
| |
| def listtopos(*args): |
| """Syntax: [storm list] |
| |
| List the running topologies and their statuses. |
| """ |
| exec_storm_class( |
| "backtype.storm.command.list", |
| args=args, |
| jvmtype="-client", |
| extrajars=[CONF_DIR, STORM_DIR + "/bin"]) |
| |
| def deactivate(*args): |
| """Syntax: [storm deactivate topology-name] |
| |
| Deactivates the specified topology's spouts. |
| """ |
| exec_storm_class( |
| "backtype.storm.command.deactivate", |
| args=args, |
| jvmtype="-client", |
| extrajars=[CONF_DIR, STORM_DIR + "/bin"]) |
| |
| def rebalance(*args): |
| """Syntax: [storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*] |
| |
| Sometimes you may wish to spread out where the workers for a topology |
| are running. For example, let's say you have a 10 node cluster running |
| 4 workers per node, and then let's say you add another 10 nodes to |
| the cluster. You may wish to have Storm spread out the workers for the |
| running topology so that each node runs 2 workers. One way to do this |
| is to kill the topology and resubmit it, but Storm provides a "rebalance" |
| command that provides an easier way to do this. |
| |
| Rebalance will first deactivate the topology for the duration of the |
| message timeout (overridable with the -w flag) and then redistribute |
| the workers evenly around the cluster. The topology will then return to |
| its previous state of activation (so a deactivated topology will still |
| be deactivated and an activated topology will go back to being activated). |
| |
| The rebalance command can also be used to change the parallelism of a running topology. |
| Use the -n and -e switches to change the number of workers or number of executors of a component |
| respectively. |
| """ |
| exec_storm_class( |
| "backtype.storm.command.rebalance", |
| args=args, |
| jvmtype="-client", |
| extrajars=[CONF_DIR, STORM_DIR + "/bin"]) |
| |
| def shell(resourcesdir, command, *args): |
| tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar" |
| os.system("jar cf %s %s" % (tmpjarpath, resourcesdir)) |
| runnerargs = [tmpjarpath, command] |
| runnerargs.extend(args) |
| exec_storm_class( |
| "backtype.storm.command.shell_submission", |
| args=runnerargs, |
| jvmtype="-client", |
| extrajars=[CONF_DIR], |
| fork=True) |
| os.system("rm " + tmpjarpath) |
| |
| def repl(): |
| """Syntax: [storm repl] |
| |
| Opens up a Clojure REPL with the storm jars and configuration |
| on the classpath. Useful for debugging. |
| """ |
| cppaths = [STORM_DIR + "/conf"] |
| exec_storm_class("clojure.lang.Repl", jvmtype="-client", extrajars=cppaths) |
| |
| def nimbus(klass="backtype.storm.daemon.nimbus"): |
| """Syntax: [storm nimbus] |
| |
| Launches the nimbus daemon. This command should be run under |
| supervision with a tool like daemontools or monit. |
| |
| See Setting up a Storm cluster for more information. |
| (https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster) |
| """ |
| cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"] |
| jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [ |
| "-Dlogfile.name=nimbus.log", |
| "-Dlog4j.configuration=storm.log.properties", |
| ] |
| exec_storm_class( |
| klass, |
| jvmtype="-server", |
| extrajars=cppaths, |
| jvmopts=jvmopts) |
| |
| def supervisor(klass="backtype.storm.daemon.supervisor"): |
| """Syntax: [storm supervisor] |
| |
| Launches the supervisor daemon. This command should be run |
| under supervision with a tool like daemontools or monit. |
| |
| See Setting up a Storm cluster for more information. |
| (https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster) |
| """ |
| cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"] |
| jvmopts = parse_args(confvalue("supervisor.childopts", cppaths)) + [ |
| "-Dlogfile.name=supervisor.log", |
| "-Dlog4j.configuration=storm.log.properties", |
| ] |
| exec_storm_class( |
| klass, |
| jvmtype="-server", |
| extrajars=cppaths, |
| jvmopts=jvmopts) |
| |
| def ui(): |
| """Syntax: [storm ui] |
| |
| Launches the UI daemon. The UI provides a web interface for a Storm |
| cluster and shows detailed stats about running topologies. This command |
| should be run under supervision with a tool like daemontools or monit. |
| |
| See Setting up a Storm cluster for more information. |
| (https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster) |
| """ |
| cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"] |
| jvmopts = parse_args(confvalue("ui.childopts", cppaths)) + [ |
| "-Dlogfile.name=ui.log", |
| "-Dlog4j.configuration=storm.log.properties", |
| ] |
| exec_storm_class( |
| "backtype.storm.ui.core", |
| jvmtype="-server", |
| jvmopts=jvmopts, |
| extrajars=[STORM_DIR + "/log4j", STORM_DIR, STORM_DIR + "/conf"]) |
| |
| def drpc(): |
| """Syntax: [storm drpc] |
| |
| Launches a DRPC daemon. This command should be run under supervision |
| with a tool like daemontools or monit. |
| |
| See Distributed RPC for more information. |
| (https://github.com/nathanmarz/storm/wiki/Distributed-RPC) |
| """ |
| jvmopts = ["-Xmx768m", "-Dlogfile.name=drpc.log", "-Dlog4j.configuration=storm.log.properties"] |
| exec_storm_class( |
| "backtype.storm.daemon.drpc", |
| jvmtype="-server", |
| jvmopts=jvmopts, |
| extrajars=[STORM_DIR + "/log4j", STORM_DIR + "/conf"]) |
| |
| def dev_zookeeper(): |
| """Syntax: [storm dev-zookeeper] |
| |
| Launches a fresh Zookeeper server using "dev.zookeeper.path" as its local dir and |
| "storm.zookeeper.port" as its port. This is only intended for development/testing, the |
| Zookeeper instance launched is not configured to be used in production. |
| """ |
| cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"] |
| exec_storm_class( |
| "backtype.storm.command.dev_zookeeper", |
| jvmtype="-server", |
| extrajars=[STORM_DIR + "/log4j", STORM_DIR + "/conf"]) |
| |
| def version(): |
| """Syntax: [storm version] |
| |
| Prints the version number of this Storm release. |
| """ |
| releasefile = STORM_DIR + "/RELEASE" |
| if os.path.exists(releasefile): |
| print open(releasefile).readline().strip() |
| else: |
| print "Unknown" |
| |
| def print_classpath(): |
| """Syntax: [storm classpath] |
| |
| Prints the classpath used by the storm client when running commands. |
| """ |
| print get_classpath([]) |
| |
| def print_commands(): |
| """Print all client commands and link to documentation""" |
| print "Commands:\n\t", "\n\t".join(sorted(COMMANDS.keys())) |
| print "\nHelp:", "\n\thelp", "\n\thelp <command>" |
| print "\nDocumentation for the storm client can be found at https://github.com/nathanmarz/storm/wiki/Command-line-client\n" |
| print "Configs can be overridden using one or more -c flags, e.g. \"storm list -c nimbus.host=nimbus.mycompany.com\"\n" |
| |
| def print_usage(command=None): |
| """Print one help message or list of available commands""" |
| if command != None: |
| if COMMANDS.has_key(command): |
| print (COMMANDS[command].__doc__ or |
| "No documentation provided for <%s>" % command) |
| else: |
| print "<%s> is not a valid command" % command |
| else: |
| print_commands() |
| |
| def unknown_command(*args): |
| print "Unknown command: [storm %s]" % ' '.join(sys.argv[1:]) |
| print_usage() |
| |
| COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui, |
| "drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue, |
| "remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath, |
| "activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage, |
| "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version} |
| |
| def parse_config(config_list): |
| global CONFIG_OPTS |
| if len(config_list) > 0: |
| for config in config_list: |
| CONFIG_OPTS.append(config) |
| |
| def parse_config_opts(args): |
| curr = args[:] |
| curr.reverse() |
| config_list = [] |
| args_list = [] |
| |
| while len(curr) > 0: |
| token = curr.pop() |
| if token == "-c": |
| config_list.append(curr.pop()) |
| elif token == "--config": |
| global CONFFILE |
| CONFFILE = curr.pop() |
| else: |
| args_list.append(token) |
| |
| return config_list, args_list |
| |
| def main(): |
| if len(sys.argv) <= 1: |
| print_usage() |
| sys.exit(-1) |
| global CONFIG_OPTS |
| config_list, args = parse_config_opts(sys.argv[1:]) |
| parse_config(config_list) |
| COMMAND = args[0] |
| ARGS = args[1:] |
| (COMMANDS.get(COMMAND, unknown_command))(*ARGS) |
| |
| if __name__ == "__main__": |
| main() |