| #!/usr/bin/python |
| |
| import os |
| import sys |
| import random |
| import subprocess as sub |
| |
| CONF_DIR = os.path.expanduser("~/.storm") |
| STORM_DIR = "/".join(os.path.abspath( __file__ ).split("/")[:-2]) |
| |
| def get_jars_full(adir): |
| files = os.listdir(adir) |
| ret = [] |
| for f in files: |
| if f.endswith(".jar"): |
| ret.append(adir + "/" + f) |
| return ret |
| |
| def get_classpath(extrajars): |
| ret = get_jars_full(STORM_DIR) |
| ret.extend(get_jars_full(STORM_DIR + "/lib")) |
| ret.extend(extrajars) |
| return ":".join(ret) |
| |
| def confvalue(name): |
| cp = get_classpath([]) |
| command = ["java", "-client", "-cp", cp, "backtype.storm.command.config_value", name] |
| p = sub.Popen(command,stdout=sub.PIPE) |
| output, errors = p.communicate() |
| lines = output.split("\n") |
| for line in lines: |
| tokens = line.split(" ") |
| if tokens[0] == "VALUE:": |
| return tokens[1] |
| |
| def exec_storm_class(klass, jvmtype="-server", childopts="", extrajars=[], args=[], prefix=""): |
| nativepath = confvalue("java.library.path") |
| command = prefix + " java " + jvmtype + " -Djava.library.path=" + nativepath + " " + childopts + " -cp " + get_classpath(extrajars) + " " + klass + " " + " ".join(args) |
| print "Running: " + command |
| os.system(command) |
| |
| COMMAND = sys.argv[1] |
| ARGS = sys.argv[2:] |
| |
| def jar(jarfile, klass, *args): |
| exec_storm_class( |
| klass, |
| childopts="-Dlog4j.configuration=storm.log.properties", |
| jvmtype="-client", |
| extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"], |
| args=args, |
| prefix="export STORM_JAR=" + jarfile + ";") |
| |
| def kill(name): |
| exec_storm_class("backtype.storm.command.kill_topology", args=[name], jvmtype="-client", extrajars=[CONF_DIR, STORM_DIR + "/bin"], childopts="-Dlog4j.configuration=storm.log.properties") |
| |
| def shell(resourcesdir, command, *args): |
| tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar" |
| os.system("jar cf %s %s" % (tmpjarpath, resourcesdir)) |
| runnerargs = [tmpjarpath, command] |
| runnerargs.extend(args) |
| exec_storm_class("backtype.storm.command.shell_submission", args=runnerargs, jvmtype="-client", extrajars=[CONF_DIR], childopts="-Dlog4j.configuration=storm.log.properties") |
| os.system("rm " + tmpjarpath) |
| |
| def nimbus(): |
| childopts = confvalue("nimbus.childopts") + " -Dlogfile.name=nimbus.log" |
| exec_storm_class("backtype.storm.daemon.nimbus", jvmtype="-server", extrajars=[STORM_DIR + "/log4j", STORM_DIR + "/conf"], childopts=childopts) |
| |
| def supervisor(): |
| childopts = confvalue("nimbus.childopts") + " -Dlogfile.name=supervisor.log" |
| exec_storm_class("backtype.storm.daemon.supervisor", jvmtype="-server", extrajars=[STORM_DIR + "/log4j", STORM_DIR + "/conf"], childopts=childopts) |
| |
| def ui(): |
| childopts = "-Xmx768m -Dlogfile.name=ui.log" |
| exec_storm_class("backtype.storm.ui.core", jvmtype="-server", childopts=childopts, extrajars=[STORM_DIR + "/log4j", STORM_DIR, STORM_DIR + "/conf"]) |
| |
| |
| COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui, "supervisor": supervisor} |
| |
| COMMANDS[COMMAND](*ARGS) |