blob: b5513577ce5d73e1822ac9a0ec796204ebfe726c [file] [log] [blame]
#!/usr/bin/python
import os
import sys
import random
import subprocess as sub
def identity(x):
return x
def cygpath(x):
command = ["cygpath", "-wp", x]
p = sub.Popen(command,stdout=sub.PIPE)
output, errors = p.communicate()
lines = output.split("\n")
return lines[0]
if sys.platform == "cygwin":
normclasspath = cygpath
else:
normclasspath = identity
CONF_DIR = os.path.expanduser("~/.storm")
STORM_DIR = "/".join(os.path.abspath( __file__ ).split("/")[:-2])
if not os.path.exists(STORM_DIR + "/RELEASE"):
print "******************************************"
print "The storm client can only be run from within a release. You appear to be trying to run the client from a checkout of Storm's source code."
print "\nYou can download a Storm release at https://github.com/nathanmarz/storm/downloads"
print "******************************************"
sys.exit(1)
def get_jars_full(adir):
files = os.listdir(adir)
ret = []
for f in files:
if f.endswith(".jar"):
ret.append(adir + "/" + f)
return ret
def get_classpath(extrajars):
ret = get_jars_full(STORM_DIR)
ret.extend(get_jars_full(STORM_DIR + "/lib"))
ret.extend(extrajars)
return normclasspath(":".join(ret))
def confvalue(name, extrapaths):
cp = get_classpath(extrapaths)
command = ["java", "-client", "-cp", cp, "backtype.storm.command.config_value", name]
p = sub.Popen(command,stdout=sub.PIPE)
output, errors = p.communicate()
lines = output.split("\n")
for line in lines:
tokens = line.split(" ")
if tokens[0] == "VALUE:":
return tokens[1]
def print_localconfvalue(name):
print name + ": " + confvalue(name, [CONF_DIR])
def print_remoteconfvalue(name):
print name + ": " + confvalue(name, [STORM_DIR + "/conf"])
def exec_storm_class(klass, jvmtype="-server", childopts="", extrajars=[], args=[]):
nativepath = confvalue("java.library.path", extrajars)
command = "java " + jvmtype + " -Djava.library.path=" + nativepath + " " + childopts + " -cp " + get_classpath(extrajars) + " " + klass + " " + " ".join(args)
print "Running: " + command
os.system(command)
def jar(jarfile, klass, *args):
exec_storm_class(
klass,
jvmtype="-client",
extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"],
args=args,
childopts="-Dstorm.jar=" + jarfile)
def kill(*args):
exec_storm_class("backtype.storm.command.kill_topology", args=args, jvmtype="-client", extrajars=[CONF_DIR, STORM_DIR + "/bin"])
def activate(*args):
exec_storm_class("backtype.storm.command.activate", args=args, jvmtype="-client", extrajars=[CONF_DIR, STORM_DIR + "/bin"])
def deactivate(*args):
exec_storm_class("backtype.storm.command.deactivate", args=args, jvmtype="-client", extrajars=[CONF_DIR, STORM_DIR + "/bin"])
def rebalance(*args):
exec_storm_class("backtype.storm.command.rebalance", args=args, jvmtype="-client", extrajars=[CONF_DIR, STORM_DIR + "/bin"])
def shell(resourcesdir, command, *args):
tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar"
os.system("jar cf %s %s" % (tmpjarpath, resourcesdir))
runnerargs = [tmpjarpath, command]
runnerargs.extend(args)
exec_storm_class("backtype.storm.command.shell_submission", args=runnerargs, jvmtype="-client", extrajars=[CONF_DIR])
os.system("rm " + tmpjarpath)
def repl():
cppaths = [STORM_DIR + "/conf"]
exec_storm_class("clojure.lang.Repl", jvmtype="-client", extrajars=cppaths)
def nimbus():
cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"]
childopts = confvalue("nimbus.childopts", cppaths) + " -Dlogfile.name=nimbus.log -Dlog4j.configuration=storm.log.properties"
exec_storm_class("backtype.storm.daemon.nimbus", jvmtype="-server", extrajars=cppaths, childopts=childopts)
def supervisor():
cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"]
childopts = confvalue("nimbus.childopts", cppaths) + " -Dlogfile.name=supervisor.log -Dlog4j.configuration=storm.log.properties"
exec_storm_class("backtype.storm.daemon.supervisor", jvmtype="-server", extrajars=cppaths, childopts=childopts)
def ui():
childopts = "-Xmx768m -Dlogfile.name=ui.log -Dlog4j.configuration=storm.log.properties"
exec_storm_class("backtype.storm.ui.core", jvmtype="-server", childopts=childopts, extrajars=[STORM_DIR + "/log4j", STORM_DIR, STORM_DIR + "/conf"])
def drpc():
childopts = "-Xmx768m -Dlogfile.name=drpc.log -Dlog4j.configuration=storm.log.properties"
exec_storm_class("backtype.storm.daemon.drpc", jvmtype="-server", childopts=childopts, extrajars=[STORM_DIR + "/log4j", STORM_DIR + "/conf"])
def print_classpath():
print get_classpath([])
COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui,
"drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue,
"remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath,
"activate": activate, "deactivate": deactivate, "rebalance": rebalance}
def print_commands():
global COMMANDS
cmds = COMMANDS.keys()
cmds.sort()
print "Commands:\n\t", reduce(lambda x,y: x + '\n\t' + y, cmds[1:], cmds[0])
print "\nDocumentation for the storm client can be found at https://github.com/nathanmarz/storm/wiki/Command-line-client\n"
def print_usage(msg=None):
if msg != None:
print msg
print_commands()
def main():
if len(sys.argv) <= 1:
print_usage()
sys.exit(-1)
COMMAND = sys.argv[1]
ARGS = sys.argv[2:]
COMMANDS[COMMAND](*ARGS)
if __name__ == "__main__":
main()