| #!/usr/bin/python |
| |
| import os |
| import sys |
| import random |
| import subprocess as sub |
| |
| def identity(x): |
| return x |
| |
| def cygpath(x): |
| command = ["cygpath", "-wp", x] |
| p = sub.Popen(command,stdout=sub.PIPE) |
| output, errors = p.communicate() |
| lines = output.split("\n") |
| return lines[0] |
| |
| if sys.platform == "cygwin": |
| normclasspath = cygpath |
| else: |
| normclasspath = identity |
| |
| CONF_DIR = os.path.expanduser("~/.storm") |
| STORM_DIR = "/".join(os.path.abspath( __file__ ).split("/")[:-2]) |
| |
| if not os.path.exists(STORM_DIR + "/RELEASE"): |
| print "******************************************" |
| print "The storm client can only be run from within a release. You appear to be trying to run the client from a checkout of Storm's source code." |
| print "\nYou can download a Storm release at https://github.com/nathanmarz/storm/downloads" |
| print "******************************************" |
| sys.exit(1) |
| |
| def get_jars_full(adir): |
| files = os.listdir(adir) |
| ret = [] |
| for f in files: |
| if f.endswith(".jar"): |
| ret.append(adir + "/" + f) |
| return ret |
| |
| def get_classpath(extrajars): |
| ret = get_jars_full(STORM_DIR) |
| ret.extend(get_jars_full(STORM_DIR + "/lib")) |
| ret.extend(extrajars) |
| return normclasspath(":".join(ret)) |
| |
| def confvalue(name, extrapaths): |
| cp = get_classpath(extrapaths) |
| command = ["java", "-client", "-cp", cp, "backtype.storm.command.config_value", name] |
| p = sub.Popen(command,stdout=sub.PIPE) |
| output, errors = p.communicate() |
| lines = output.split("\n") |
| for line in lines: |
| tokens = line.split(" ") |
| if tokens[0] == "VALUE:": |
| return tokens[1] |
| |
| def print_localconfvalue(name): |
| print name + ": " + confvalue(name, [CONF_DIR]) |
| |
| def print_remoteconfvalue(name): |
| print name + ": " + confvalue(name, [STORM_DIR + "/conf"]) |
| |
| def exec_storm_class(klass, jvmtype="-server", childopts="", extrajars=[], args=[]): |
| nativepath = confvalue("java.library.path", extrajars) |
| command = "java " + jvmtype + " -Djava.library.path=" + nativepath + " " + childopts + " -cp " + get_classpath(extrajars) + " " + klass + " " + " ".join(args) |
| print "Running: " + command |
| os.system(command) |
| |
| def jar(jarfile, klass, *args): |
| exec_storm_class( |
| klass, |
| jvmtype="-client", |
| extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"], |
| args=args, |
| childopts="-Dstorm.jar=" + jarfile) |
| |
| def kill(*args): |
| exec_storm_class("backtype.storm.command.kill_topology", args=args, jvmtype="-client", extrajars=[CONF_DIR, STORM_DIR + "/bin"]) |
| |
| def activate(*args): |
| exec_storm_class("backtype.storm.command.activate", args=args, jvmtype="-client", extrajars=[CONF_DIR, STORM_DIR + "/bin"]) |
| |
| def deactivate(*args): |
| exec_storm_class("backtype.storm.command.deactivate", args=args, jvmtype="-client", extrajars=[CONF_DIR, STORM_DIR + "/bin"]) |
| |
| def rebalance(*args): |
| exec_storm_class("backtype.storm.command.rebalance", args=args, jvmtype="-client", extrajars=[CONF_DIR, STORM_DIR + "/bin"]) |
| |
| def shell(resourcesdir, command, *args): |
| tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar" |
| os.system("jar cf %s %s" % (tmpjarpath, resourcesdir)) |
| runnerargs = [tmpjarpath, command] |
| runnerargs.extend(args) |
| exec_storm_class("backtype.storm.command.shell_submission", args=runnerargs, jvmtype="-client", extrajars=[CONF_DIR]) |
| os.system("rm " + tmpjarpath) |
| |
| def repl(): |
| cppaths = [STORM_DIR + "/conf"] |
| exec_storm_class("clojure.lang.Repl", jvmtype="-client", extrajars=cppaths) |
| |
| def nimbus(): |
| cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"] |
| childopts = confvalue("nimbus.childopts", cppaths) + " -Dlogfile.name=nimbus.log -Dlog4j.configuration=storm.log.properties" |
| exec_storm_class("backtype.storm.daemon.nimbus", jvmtype="-server", extrajars=cppaths, childopts=childopts) |
| |
| def supervisor(): |
| cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"] |
| childopts = confvalue("nimbus.childopts", cppaths) + " -Dlogfile.name=supervisor.log -Dlog4j.configuration=storm.log.properties" |
| exec_storm_class("backtype.storm.daemon.supervisor", jvmtype="-server", extrajars=cppaths, childopts=childopts) |
| |
| def ui(): |
| childopts = "-Xmx768m -Dlogfile.name=ui.log -Dlog4j.configuration=storm.log.properties" |
| exec_storm_class("backtype.storm.ui.core", jvmtype="-server", childopts=childopts, extrajars=[STORM_DIR + "/log4j", STORM_DIR, STORM_DIR + "/conf"]) |
| |
| def drpc(): |
| childopts = "-Xmx768m -Dlogfile.name=drpc.log -Dlog4j.configuration=storm.log.properties" |
| exec_storm_class("backtype.storm.daemon.drpc", jvmtype="-server", childopts=childopts, extrajars=[STORM_DIR + "/log4j", STORM_DIR + "/conf"]) |
| |
| def print_classpath(): |
| print get_classpath([]) |
| |
| COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui, |
| "drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue, |
| "remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath, |
| "activate": activate, "deactivate": deactivate, "rebalance": rebalance} |
| |
| def print_commands(): |
| global COMMANDS |
| cmds = COMMANDS.keys() |
| cmds.sort() |
| print "Commands:\n\t", reduce(lambda x,y: x + '\n\t' + y, cmds[1:], cmds[0]) |
| print "\nDocumentation for the storm client can be found at https://github.com/nathanmarz/storm/wiki/Command-line-client\n" |
| |
| def print_usage(msg=None): |
| if msg != None: |
| print msg |
| print_commands() |
| |
| def main(): |
| if len(sys.argv) <= 1: |
| print_usage() |
| sys.exit(-1) |
| |
| COMMAND = sys.argv[1] |
| ARGS = sys.argv[2:] |
| COMMANDS[COMMAND](*ARGS) |
| |
| if __name__ == "__main__": |
| main() |