blob: a9e5c26723361ebaff252d61f7e1c2aa2949e63e [file] [log] [blame]
#!/usr/bin/python
import os
import sys
import random
import subprocess as sub
CONF_DIR = os.path.expanduser("~/.storm")
STORM_DIR = "/".join(os.path.abspath( __file__ ).split("/")[:-2])
def get_jars_full(adir):
files = os.listdir(adir)
ret = []
for f in files:
if f.endswith(".jar"):
ret.append(adir + "/" + f)
return ret
def get_classpath(extrajars):
ret = get_jars_full(STORM_DIR)
ret.extend(get_jars_full(STORM_DIR + "/lib"))
ret.extend(extrajars)
return ":".join(ret)
def confvalue(name):
cp = get_classpath([])
command = ["java", "-client", "-cp", cp, "backtype.storm.command.config_value", name]
p = sub.Popen(command,stdout=sub.PIPE)
output, errors = p.communicate()
lines = output.split("\n")
for line in lines:
tokens = line.split(" ")
if tokens[0] == "VALUE:":
return tokens[1]
def exec_storm_class(klass, jvmtype="-server", childopts="", extrajars=[], args=[], prefix=""):
nativepath = confvalue("java.library.path")
command = prefix + " java " + jvmtype + " -Djava.library.path=" + nativepath + " " + childopts + " -cp " + get_classpath(extrajars) + " " + klass + " " + " ".join(args)
print "Running: " + command
os.system(command)
COMMAND = sys.argv[1]
ARGS = sys.argv[2:]
def jar(jarfile, klass, *args):
exec_storm_class(
klass,
childopts="-Dlog4j.configuration=storm.log.properties",
jvmtype="-client",
extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"],
args=args,
prefix="export STORM_JAR=" + jarfile + ";")
def kill(name):
exec_storm_class("backtype.storm.command.kill_topology", args=[name], jvmtype="-client", extrajars=[CONF_DIR, STORM_DIR + "/bin"], childopts="-Dlog4j.configuration=storm.log.properties")
def shell(resourcesdir, command, *args):
tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar"
os.system("jar cf %s %s" % (tmpjarpath, resourcesdir))
runnerargs = [tmpjarpath, command]
runnerargs.extend(args)
exec_storm_class("backtype.storm.command.shell_submission", args=runnerargs, jvmtype="-client", extrajars=[CONF_DIR], childopts="-Dlog4j.configuration=storm.log.properties")
os.system("rm " + tmpjarpath)
def nimbus():
childopts = confvalue("nimbus.childopts") + " -Dlogfile.name=nimbus.log"
exec_storm_class("backtype.storm.daemon.nimbus", jvmtype="-server", extrajars=[STORM_DIR + "/log4j", STORM_DIR + "/conf"], childopts=childopts)
def supervisor():
childopts = confvalue("nimbus.childopts") + " -Dlogfile.name=supervisor.log"
exec_storm_class("backtype.storm.daemon.supervisor", jvmtype="-server", extrajars=[STORM_DIR + "/log4j", STORM_DIR + "/conf"], childopts=childopts)
def ui():
childopts = "-Xmx768m -Dlogfile.name=ui.log"
exec_storm_class("backtype.storm.ui.core", jvmtype="-server", childopts=childopts, extrajars=[STORM_DIR + "/log4j", STORM_DIR, STORM_DIR + "/conf"])
COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui, "supervisor": supervisor}
COMMANDS[COMMAND](*ARGS)