Merge pull request #3135 from govind-menon/STORM-3515
STORM-3515: Fixes config override flag parsing in Storm CLI
diff --git a/bin/storm.py b/bin/storm.py
index e1b47f6..9c45c26 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -81,9 +81,10 @@
cmd = os.path.join(JAVA_HOME, 'bin', cmd)
return cmd
-def confvalue(name, storm_config_opts, extrapaths, daemon=True):
+def confvalue(name, storm_config_opts, extrapaths, overriding_conf_file=None, daemon=True):
command = [
- JAVA_CMD, "-client", get_config_opts(storm_config_opts), "-Dstorm.conf.file=" + CONF_FILE,
+ JAVA_CMD, "-client", get_config_opts(storm_config_opts),
+ "-Dstorm.conf.file=" + (overriding_conf_file if overriding_conf_file else ""),
"-cp", get_classpath(extrajars=extrapaths, daemon=daemon), "org.apache.storm.command.ConfigValue", name
]
output = subprocess.Popen(command, stdout=subprocess.PIPE).communicate()[0]
@@ -229,9 +230,12 @@
raise RuntimeError("dependency handler returns non-json response: sysout<%s>", output)
-def exec_storm_class(klass, storm_config_opts, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False, daemon=True, client=False, daemonName=""):
- storm_log_dir = confvalue("storm.log.dir", storm_config_opts=storm_config_opts, extrapaths=[CLUSTER_CONF_DIR])
- if(storm_log_dir == None or storm_log_dir == "null"):
+def exec_storm_class(klass, storm_config_opts, jvmtype="-server", jvmopts=[],
+ extrajars=[], args=[], fork=False, daemon=True, client=False, daemonName="",
+ overriding_conf_file=None):
+ storm_log_dir = confvalue("storm.log.dir", storm_config_opts=storm_config_opts,
+ extrapaths=[CLUSTER_CONF_DIR], overriding_conf_file=overriding_conf_file)
+ if storm_log_dir is None or storm_log_dir in ["null", ""]:
storm_log_dir = os.path.join(STORM_DIR, "logs")
all_args = [
JAVA_CMD, jvmtype,
@@ -240,7 +244,7 @@
"-Dstorm.home=" + STORM_DIR,
"-Dstorm.log.dir=" + storm_log_dir,
"-Djava.library.path=" + confvalue("java.library.path", storm_config_opts, extrajars, daemon=daemon),
- "-Dstorm.conf.file=" + CONF_FILE,
+ "-Dstorm.conf.file=" + (overriding_conf_file if overriding_conf_file else ""),
"-cp", get_classpath(extrajars, daemon, client=client),
] + jvmopts + [klass] + list(args)
print("Running: " + " ".join(all_args))
@@ -278,24 +282,26 @@
klass, args.storm_config_opts,
jvmtype="-client",
extrajars=extra_jars,
- args=args.topology_main_args,
+ args=args.main_args,
daemon=False,
jvmopts=JAR_JVM_OPTS + extrajvmopts + ["-Dstorm.jar=" + jarfile] +
["-Dstorm.dependency.jars=" + ",".join(local_jars)] +
- ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)])
+ ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)],
+ overriding_conf_file=args.config)
def print_localconfvalue(args):
- print(args.conf_name + ": " + confvalue(args.conf_name, args.storm_config_opts, [USER_CONF_DIR]))
+ print(args.conf_name + ": " + confvalue(args.conf_name, args.storm_config_opts,
+ [USER_CONF_DIR], overriding_conf_file=args.config))
def print_remoteconfvalue(args):
- print(args.conf_name + ": " + confvalue(args.conf_name, args.storm_config_opts, [CLUSTER_CONF_DIR]))
+ print(args.conf_name + ": " + confvalue(args.conf_name, args.storm_config_opts,
+ [CLUSTER_CONF_DIR], overriding_conf_file=args.config))
def initialize_main_command():
main_parser = argparse.ArgumentParser(prog="storm", formatter_class=SortingHelpFormatter)
- add_common_options(main_parser)
subparsers = main_parser.add_subparsers(help="")
@@ -360,12 +366,25 @@
add_common_options(sub_parser)
-def add_common_options(parser):
+def add_common_options(parser, main_args=True):
parser.add_argument("--config", default=None, help="Override default storm conf file")
parser.add_argument(
"-storm_config_opts", "-c", action="append", default=[],
help="Override storm conf properties , e.g. nimbus.ui.port=4443"
)
+ if main_args:
+ parser.add_argument(
+ "main_args", metavar="main_args",
+ nargs='*', help="Runs the main method with the specified arguments."
+ )
+
+def remove_common_options(sys_args):
+ flags_to_filter = ["-c", "-storm_config_opts", "--config"]
+ filtered_sys_args = [
+ sys_args[i] for i in range(0, len(sys_args)) if (not (sys_args[i] in flags_to_filter) and ((i<1) or
+ not (sys_args[i - 1] in flags_to_filter)))
+ ]
+ return filtered_sys_args
def add_topology_jar_options(parser):
parser.add_argument(
@@ -376,10 +395,6 @@
"topology_main_class", metavar="topology-main-class",
help="main class of the topology jar being submitted"
)
- parser.add_argument(
- "topology_main_args", metavar="topology_main_args",
- nargs='*', help="Runs the main method with the specified arguments."
- )
def add_client_jar_options(parser):
@@ -510,12 +525,6 @@
raise argparse.ArgumentTypeError("%s is not a positive integer" % value)
return ivalue
-def check_even_list(cred_list):
- if not (len(cred_list) % 2):
- raise argparse.ArgumentTypeError("please provide a list of cred key and value pairs")
- return cred_list
-
-
def initialize_upload_credentials_subcommand(subparsers):
command_help = """Uploads a new set of credentials to a running topology."""
sub_parser = subparsers.add_parser("upload-credentials", help=command_help, formatter_class=SortingHelpFormatter)
@@ -533,8 +542,7 @@
)
sub_parser.add_argument(
- "cred_list", nargs='*', help="List of credkeys and their values [credkey credvalue]*",
- type=check_even_list
+ "cred_list", nargs='*', help="List of credkeys and their values [credkey credvalue]*"
)
sub_parser.set_defaults(func=upload_credentials)
@@ -563,7 +571,7 @@
group.add_argument("--explain", action="store_true", help="activate explain mode")
sub_parser.set_defaults(func=sql)
- add_common_options(sub_parser)
+ add_common_options(sub_parser, main_args=False)
def initialize_blobstore_subcommand(subparsers):
@@ -581,7 +589,7 @@
)
list_parser.add_argument(
"keys", nargs='+')
- add_common_options(list_parser)
+ add_common_options(list_parser, main_args=False)
cat_parser = sub_sub_parsers.add_parser(
"cat", help="read a blob and then either write it to a file, or STDOUT (requires read access).", formatter_class=SortingHelpFormatter
@@ -892,7 +900,7 @@
sub_parser.add_argument("args", nargs='*', default=[])
sub_parser.set_defaults(func=shell)
- add_common_options(sub_parser)
+ add_common_options(sub_parser, main_args=False)
def initialize_repl_subcommand(subparsers):
@@ -987,7 +995,7 @@
sub_parser.add_argument("function_arguments", nargs='*', default=[])
sub_parser.set_defaults(func=drpc_client)
- add_common_options(sub_parser)
+ add_common_options(sub_parser, main_args=False)
def initialize_drpc_subcommand(subparsers):
@@ -1061,7 +1069,7 @@
extrajvmopts = ["-Dstorm.local.sleeptime=" + args.local_ttl]
if args.java_debug:
extrajvmopts += ["-agentlib:jdwp=" + args.java_debug]
- args.topology_main_args = [args.topology_main_class] + args.topology_main_args
+ args.main_args = [args.topology_main_class] + args.main_args
run_client_jar(
"org.apache.storm.LocalCluster", args,
client=False, daemon=False, extrajvmopts=extrajvmopts)
@@ -1101,23 +1109,28 @@
args=sql_args,
daemon=False,
jvmopts=["-Dstorm.dependency.jars=" + ",".join(local_jars)] +
- ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)])
+ ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)],
+ overriding_conf_file=args.config)
def kill(args):
exec_storm_class(
"org.apache.storm.command.KillTopology",
- args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def upload_credentials(args):
+ if (len(args.cred_list) % 2 != 0):
+ raise argparse.ArgumentTypeError("please provide a list of cred key and value pairs " + cred_list)
exec_storm_class(
"org.apache.storm.command.UploadCredentials",
- args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def blob(args):
@@ -1125,32 +1138,36 @@
raise argparse.ArgumentTypeError("Replication factor needed when doing blob update")
exec_storm_class(
"org.apache.storm.command.Blobstore",
- args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def heartbeats(args):
exec_storm_class(
"org.apache.storm.command.Heartbeats",
- args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def activate(args):
exec_storm_class(
"org.apache.storm.command.Activate",
- args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def listtopos(args):
exec_storm_class(
"org.apache.storm.command.ListTopologies",
- args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def set_log_level(args):
for log_level in args.l:
@@ -1163,16 +1180,18 @@
raise argparse.ArgumentTypeError("Should be in the form[logger name]=[log level][:optional timeout]")
exec_storm_class(
"org.apache.storm.command.SetLogLevel",
- args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def deactivate(args):
exec_storm_class(
"org.apache.storm.command.Deactivate",
- args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def rebalance(args):
@@ -1186,41 +1205,46 @@
raise argparse.ArgumentTypeError("Should be in the form component_name:new_executor_count")
exec_storm_class(
"org.apache.storm.command.Rebalance",
- args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def get_errors(args):
exec_storm_class(
"org.apache.storm.command.GetErrors",
- args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def healthcheck(args):
exec_storm_class(
"org.apache.storm.command.HealthCheck",
- args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def kill_workers(args):
exec_storm_class(
"org.apache.storm.command.KillWorkers",
- args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def admin(args):
exec_storm_class(
"org.apache.storm.command.AdminCommands",
- args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+ args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def shell(args):
@@ -1233,21 +1257,24 @@
args=runnerargs,
jvmtype="-client",
extrajars=[USER_CONF_DIR],
- fork=True)
+ fork=True,
+ overriding_conf_file=args.config)
os.system("rm " + tmpjarpath)
def repl(args):
cppaths = [CLUSTER_CONF_DIR]
exec_storm_class(
- "clojure.main", storm_config_opts=args.storm_config_opts, jvmtype="-client", extrajars=cppaths
+ "clojure.main", storm_config_opts=args.storm_config_opts, jvmtype="-client", extrajars=cppaths,
+ overriding_conf_file=args.config
)
-def get_log4j2_conf_dir(storm_config_opts):
+def get_log4j2_conf_dir(storm_config_opts, args):
cppaths = [CLUSTER_CONF_DIR]
storm_log4j2_conf_dir = confvalue(
- "storm.log4j2.conf.dir", storm_config_opts=storm_config_opts, extrapaths=cppaths
+ "storm.log4j2.conf.dir", storm_config_opts=storm_config_opts,
+ extrapaths=cppaths, overriding_conf_file=args.config
)
if(not storm_log4j2_conf_dir or storm_log4j2_conf_dir == "null"):
storm_log4j2_conf_dir = STORM_LOG4J2_CONF_DIR
@@ -1260,18 +1287,19 @@
cppaths = [CLUSTER_CONF_DIR]
storm_config_opts = get_config_opts(args.storm_config_opts)
jvmopts = shlex.split(confvalue(
- "nimbus.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths
+ "nimbus.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths, overriding_conf_file=args.config
)) + [
"-Djava.deserialization.disabled=true",
"-Dlogfile.name=nimbus.log",
- "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml"),
+ "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml"),
]
exec_storm_class(
"org.apache.storm.daemon.nimbus.Nimbus", storm_config_opts=args.storm_config_opts,
jvmtype="-server",
daemonName="nimbus",
extrajars=cppaths,
- jvmopts=jvmopts)
+ jvmopts=jvmopts,
+ overriding_conf_file=args.config)
def pacemaker(args):
@@ -1279,47 +1307,51 @@
storm_config_opts = get_config_opts(args.storm_config_opts)
jvmopts = shlex.split(confvalue(
- "pacemaker.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths)
+ "pacemaker.childopts", storm_config_opts=storm_config_opts,
+ extrapaths=cppaths, overriding_conf_file=args.config)
) + [
"-Djava.deserialization.disabled=true",
"-Dlogfile.name=pacemaker.log",
- "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml"),
+ "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml"),
]
exec_storm_class(
"org.apache.storm.pacemaker.Pacemaker", storm_config_opts=args.storm_config_opts,
jvmtype="-server",
daemonName="pacemaker",
extrajars=cppaths,
- jvmopts=jvmopts)
+ jvmopts=jvmopts,
+ overriding_conf_file=args.config)
def supervisor(args):
cppaths = [CLUSTER_CONF_DIR]
storm_config_opts = get_config_opts(args.storm_config_opts)
jvmopts = shlex.split(confvalue(
- "supervisor.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths)
+ "supervisor.childopts", storm_config_opts=storm_config_opts,
+ extrapaths=cppaths, overriding_conf_file=args.config)
) + [
"-Djava.deserialization.disabled=true",
"-Dlogfile.name=" + STORM_SUPERVISOR_LOG_FILE,
- "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml"),
+ "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml"),
]
exec_storm_class(
"org.apache.storm.daemon.supervisor.Supervisor", storm_config_opts=args.storm_config_opts,
jvmtype="-server",
daemonName="supervisor",
extrajars=cppaths,
- jvmopts=jvmopts)
+ jvmopts=jvmopts,
+ overriding_conf_file=args.config)
def ui(args):
cppaths = [CLUSTER_CONF_DIR]
storm_config_opts = get_config_opts(args.storm_config_opts)
jvmopts = shlex.split(confvalue(
- "ui.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths)
+ "ui.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths, overriding_conf_file=args.config)
) + [
"-Djava.deserialization.disabled=true",
"-Dlogfile.name=ui.log",
- "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml")
+ "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml")
]
allextrajars = get_wildcard_dir(STORM_WEBAPP_LIB_DIR)
@@ -1329,7 +1361,8 @@
jvmtype="-server",
daemonName="ui",
jvmopts=jvmopts,
- extrajars=allextrajars)
+ extrajars=allextrajars,
+ overriding_conf_file=args.config)
def logviewer(args):
@@ -1337,12 +1370,13 @@
storm_config_opts = get_config_opts(args.storm_config_opts)
jvmopts = shlex.split(
confvalue(
- "logviewer.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths
+ "logviewer.childopts", storm_config_opts=storm_config_opts,
+ extrapaths=cppaths, overriding_conf_file=args.config
)
) + [
"-Djava.deserialization.disabled=true",
"-Dlogfile.name=logviewer.log",
- "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml")
+ "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml")
]
allextrajars = get_wildcard_dir(STORM_WEBAPP_LIB_DIR)
@@ -1352,7 +1386,8 @@
jvmtype="-server",
daemonName="logviewer",
jvmopts=jvmopts,
- extrajars=allextrajars)
+ extrajars=allextrajars,
+ overriding_conf_file=args.config)
def drpc_client(args):
@@ -1366,9 +1401,10 @@
exec_storm_class(
"org.apache.storm.command.BasicDrpcClient",
- args=sys.argv[2],
+ args=remove_common_options(sys.argv[2:]),
jvmtype="-client",
- extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+ extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+ overriding_conf_file=args.config)
def drpc(args):
@@ -1376,12 +1412,12 @@
storm_config_opts = get_config_opts(args.storm_config_opts)
jvmopts = shlex.split(
confvalue(
- "drpc.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths
+ "drpc.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths, overriding_conf_file=args.config
)
) + [
"-Djava.deserialization.disabled=true",
"-Dlogfile.name=drpc.log",
- "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml")
+ "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml")
]
allextrajars = get_wildcard_dir(STORM_WEBAPP_LIB_DIR)
allextrajars.append(CLUSTER_CONF_DIR)
@@ -1390,28 +1426,31 @@
jvmtype="-server",
daemonName="drpc",
jvmopts=jvmopts,
- extrajars=allextrajars)
+ extrajars=allextrajars,
+ overriding_conf_file=args.config)
def dev_zookeeper(args):
storm_config_opts = get_config_opts(args.storm_config_opts)
jvmopts = [
"-Dlogfile.name=dev-zookeeper.log",
- "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml")
+ "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml")
]
exec_storm_class(
"org.apache.storm.command.DevZookeeper", storm_config_opts=args.storm_config_opts,
jvmtype="-server",
daemonName="dev_zookeeper",
jvmopts=jvmopts,
- extrajars=[CLUSTER_CONF_DIR])
+ extrajars=[CLUSTER_CONF_DIR],
+ overriding_conf_file=args.config)
def version(args):
exec_storm_class(
"org.apache.storm.utils.VersionInfo", storm_config_opts=args.storm_config_opts,
jvmtype="-client",
- extrajars=[CLUSTER_CONF_DIR])
+ extrajars=[CLUSTER_CONF_DIR],
+ overriding_conf_file=args.config)
def print_classpath(args):
@@ -1425,7 +1464,7 @@
def monitor(args):
exec_storm_class(
"org.apache.storm.command.Monitor", storm_config_opts=args.storm_config_opts,
- args=sys.argv[2],
+ args=remove_common_options(sys.argv[2:]),
jvmtype="-client",
extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
diff --git a/storm-client/test/py/test_storm_cli.py b/storm-client/test/py/test_storm_cli.py
index 1069a6b..8084452 100644
--- a/storm-client/test/py/test_storm_cli.py
+++ b/storm-client/test/py/test_storm_cli.py
@@ -42,10 +42,11 @@
)
def base_test(self, command_invocation, mock_shell_interface, expected_output):
+ print(command_invocation)
with mock.patch.object(sys, "argv", command_invocation):
self.cli_main()
if expected_output not in mock_shell_interface.call_args_list:
- print("Expected:" + str(expected_output))
+ print("Expected:" + str(expected_output))
print("Got:" + str(mock_shell_interface.call_args_list[-1]))
assert expected_output in mock_shell_interface.call_args_list
@@ -59,7 +60,7 @@
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client','-Ddaemon.name=', '-Dstorm.options=+topology.blobstore.map%3D%27%7B%22key1%22%3A%7B%22localname%22%3A%22blob_file%22%2C+%22uncompress%22%3Afalse%7D%2C%22key2%22%3A%7B%7D%7D%27',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:example/storm-starter/storm-starter-topologies-*.jar:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin:./external/storm-redis/storm-redis-1.1.0.jar:./external/storm-kafka-client/storm-kafka-client-1.1.0.jar"', '-Dstorm.jar=example/storm-starter/storm-starter-topologies-*.jar', '-Dstorm.dependency.jars=./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka-client/storm-kafka-client-1.1.0.jar"', '-Dstorm.dependency.artifacts={}',
'org.apache.storm.starter.RollingTopWords', 'blobstore-remote2', 'remote'
@@ -99,7 +100,7 @@
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client','-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:example/storm-starter/storm-starter-topologies-*.jar:' + self.storm_dir +
@@ -120,7 +121,7 @@
], self.mock_execvp, mock.call(
self.java_cmd,
[self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' +
self.storm_dir +
@@ -137,7 +138,7 @@
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir +
'/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin', 'org.apache.storm.command.KillTopology', 'doomed_topology'
@@ -146,17 +147,18 @@
def test_upload_credentials_command(self):
self.base_test([
- 'storm', 'upload-credentials', 'my-topology-name', 'appids role.name1,role.name2"'
+ 'storm', 'upload-credentials', '--config', '/some/other/storm.yaml', '-c', 'test=test', 'my-topology-name', 'appids', 'role.name1,role.name2'
], self.mock_execvp, mock.call(
self.java_cmd, [
- self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=', '-Djava.library.path=',
- '-Dstorm.conf.file=', '-cp', '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' +
+ self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=test%3Dtest',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
+ '-Djava.library.path=', '-Dstorm.conf.file=/some/other/storm.yaml',
+ '-cp', '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' +
self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' +
self.storm_dir + '/conf:' + self.storm_dir +
'/bin', 'org.apache.storm.command.UploadCredentials',
- 'my-topology-name', 'appids role.name1,role.name2"'])
+ 'my-topology-name', 'appids', 'role.name1,role.name2'])
)
def test_blobstore_command(self):
@@ -165,7 +167,7 @@
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' +
@@ -180,7 +182,7 @@
self.java_cmd, [
self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
'-Dstorm.home=' + self.storm_dir + '',
- '-Dstorm.log.dir=', '-Djava.library.path=',
+ '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=',
'-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' +
@@ -193,7 +195,7 @@
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' +
@@ -207,7 +209,7 @@
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' +
@@ -220,7 +222,7 @@
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir +
'/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin',
@@ -234,7 +236,7 @@
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir +
'/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir +
@@ -248,7 +250,7 @@
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir +
'/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir +
@@ -262,7 +264,7 @@
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir +
'/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir +
@@ -276,7 +278,7 @@
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-server', '-Ddaemon.name=nimbus', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf',
@@ -292,7 +294,7 @@
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-server', '-Ddaemon.name=supervisor', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf',
@@ -308,7 +310,7 @@
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-server', '-Ddaemon.name=pacemaker', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf',
@@ -324,7 +326,7 @@
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-server', '-Ddaemon.name=ui', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir +
@@ -341,7 +343,7 @@
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-server', '-Ddaemon.name=logviewer', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir +
@@ -358,7 +360,7 @@
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-server', '-Ddaemon.name=drpc', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
'-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
'' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
'/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir +
@@ -375,7 +377,7 @@
], self.mock_execvp, mock.call(
self.java_cmd, [
self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
- '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=', '-Djava.library.path=',
+ '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=',
'-Dstorm.conf.file=', '-cp', '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' +
self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' +
self.storm_dir + '/bin', 'org.apache.storm.command.HealthCheck'