Merge pull request #3135 from govind-menon/STORM-3515

STORM-3515: Fixes config override flag parsing in Storm CLI
diff --git a/bin/storm.py b/bin/storm.py
index e1b47f6..9c45c26 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -81,9 +81,10 @@
         cmd = os.path.join(JAVA_HOME, 'bin', cmd)
     return cmd
 
-def confvalue(name, storm_config_opts, extrapaths, daemon=True):
+def confvalue(name, storm_config_opts, extrapaths, overriding_conf_file=None, daemon=True):
     command = [
-        JAVA_CMD, "-client", get_config_opts(storm_config_opts), "-Dstorm.conf.file=" + CONF_FILE,
+        JAVA_CMD, "-client", get_config_opts(storm_config_opts),
+        "-Dstorm.conf.file=" + (overriding_conf_file if overriding_conf_file else ""),
         "-cp", get_classpath(extrajars=extrapaths, daemon=daemon), "org.apache.storm.command.ConfigValue", name
     ]
     output = subprocess.Popen(command, stdout=subprocess.PIPE).communicate()[0]
@@ -229,9 +230,12 @@
         raise RuntimeError("dependency handler returns non-json response: sysout<%s>", output)
 
 
-def exec_storm_class(klass, storm_config_opts, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False, daemon=True, client=False, daemonName=""):
-    storm_log_dir = confvalue("storm.log.dir", storm_config_opts=storm_config_opts, extrapaths=[CLUSTER_CONF_DIR])
-    if(storm_log_dir == None or storm_log_dir == "null"):
+def exec_storm_class(klass, storm_config_opts, jvmtype="-server", jvmopts=[],
+                     extrajars=[], args=[], fork=False, daemon=True, client=False, daemonName="",
+                     overriding_conf_file=None):
+    storm_log_dir = confvalue("storm.log.dir", storm_config_opts=storm_config_opts,
+                              extrapaths=[CLUSTER_CONF_DIR], overriding_conf_file=overriding_conf_file)
+    if storm_log_dir is None or storm_log_dir in ["null", ""]:
         storm_log_dir = os.path.join(STORM_DIR, "logs")
     all_args = [
         JAVA_CMD, jvmtype,
@@ -240,7 +244,7 @@
        "-Dstorm.home=" + STORM_DIR,
        "-Dstorm.log.dir=" + storm_log_dir,
        "-Djava.library.path=" + confvalue("java.library.path", storm_config_opts, extrajars, daemon=daemon),
-       "-Dstorm.conf.file=" + CONF_FILE,
+       "-Dstorm.conf.file=" + (overriding_conf_file if overriding_conf_file else ""),
        "-cp", get_classpath(extrajars, daemon, client=client),
     ] + jvmopts + [klass] + list(args)
     print("Running: " + " ".join(all_args))
@@ -278,24 +282,26 @@
         klass, args.storm_config_opts,
         jvmtype="-client",
         extrajars=extra_jars,
-        args=args.topology_main_args,
+        args=args.main_args,
         daemon=False,
         jvmopts=JAR_JVM_OPTS + extrajvmopts + ["-Dstorm.jar=" + jarfile] +
                 ["-Dstorm.dependency.jars=" + ",".join(local_jars)] +
-                ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)])
+                ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)],
+        overriding_conf_file=args.config)
 
 
 def print_localconfvalue(args):
-    print(args.conf_name + ": " + confvalue(args.conf_name, args.storm_config_opts, [USER_CONF_DIR]))
+    print(args.conf_name + ": " + confvalue(args.conf_name, args.storm_config_opts,
+                                            [USER_CONF_DIR], overriding_conf_file=args.config))
 
 
 def print_remoteconfvalue(args):
-    print(args.conf_name + ": " + confvalue(args.conf_name, args.storm_config_opts, [CLUSTER_CONF_DIR]))
+    print(args.conf_name + ": " + confvalue(args.conf_name, args.storm_config_opts,
+                                            [CLUSTER_CONF_DIR], overriding_conf_file=args.config))
 
 
 def initialize_main_command():
     main_parser = argparse.ArgumentParser(prog="storm", formatter_class=SortingHelpFormatter)
-    add_common_options(main_parser)
 
     subparsers = main_parser.add_subparsers(help="")
 
@@ -360,12 +366,25 @@
     add_common_options(sub_parser)
 
 
-def add_common_options(parser):
+def add_common_options(parser, main_args=True):
     parser.add_argument("--config", default=None, help="Override default storm conf file")
     parser.add_argument(
         "-storm_config_opts", "-c", action="append", default=[],
         help="Override storm conf properties , e.g. nimbus.ui.port=4443"
     )
+    if main_args:
+        parser.add_argument(
+            "main_args", metavar="main_args",
+            nargs='*', help="Runs the main method with the specified arguments."
+        )
+
+def remove_common_options(sys_args):
+    flags_to_filter = ["-c", "-storm_config_opts", "--config"]
+    filtered_sys_args = [
+        sys_args[i] for i in range(0, len(sys_args)) if (not (sys_args[i] in flags_to_filter) and ((i<1) or
+                                         not (sys_args[i - 1] in flags_to_filter)))
+    ]
+    return filtered_sys_args
 
 def add_topology_jar_options(parser):
     parser.add_argument(
@@ -376,10 +395,6 @@
         "topology_main_class", metavar="topology-main-class",
     help="main class of the topology jar being submitted"
     )
-    parser.add_argument(
-        "topology_main_args", metavar="topology_main_args",
-        nargs='*', help="Runs the main method with the specified arguments."
-    )
 
 
 def add_client_jar_options(parser):
@@ -510,12 +525,6 @@
         raise argparse.ArgumentTypeError("%s is not a positive integer" % value)
     return ivalue
 
-def check_even_list(cred_list):
-    if not (len(cred_list) % 2):
-        raise argparse.ArgumentTypeError("please provide a list of cred key and value pairs")
-    return cred_list
-
-
 def initialize_upload_credentials_subcommand(subparsers):
     command_help = """Uploads a new set of credentials to a running topology."""
     sub_parser = subparsers.add_parser("upload-credentials", help=command_help, formatter_class=SortingHelpFormatter)
@@ -533,8 +542,7 @@
     )
 
     sub_parser.add_argument(
-        "cred_list", nargs='*', help="List of credkeys and their values [credkey credvalue]*",
-        type=check_even_list
+        "cred_list", nargs='*', help="List of credkeys and their values [credkey credvalue]*"
     )
 
     sub_parser.set_defaults(func=upload_credentials)
@@ -563,7 +571,7 @@
     group.add_argument("--explain", action="store_true", help="activate explain mode")
 
     sub_parser.set_defaults(func=sql)
-    add_common_options(sub_parser)
+    add_common_options(sub_parser, main_args=False)
 
 
 def initialize_blobstore_subcommand(subparsers):
@@ -581,7 +589,7 @@
     )
     list_parser.add_argument(
         "keys", nargs='+')
-    add_common_options(list_parser)
+    add_common_options(list_parser, main_args=False)
 
     cat_parser = sub_sub_parsers.add_parser(
         "cat", help="read a blob and then either write it to a file, or STDOUT (requires read access).", formatter_class=SortingHelpFormatter
@@ -892,7 +900,7 @@
     sub_parser.add_argument("args", nargs='*', default=[])
 
     sub_parser.set_defaults(func=shell)
-    add_common_options(sub_parser)
+    add_common_options(sub_parser, main_args=False)
 
 
 def initialize_repl_subcommand(subparsers):
@@ -987,7 +995,7 @@
     sub_parser.add_argument("function_arguments", nargs='*', default=[])
 
     sub_parser.set_defaults(func=drpc_client)
-    add_common_options(sub_parser)
+    add_common_options(sub_parser, main_args=False)
 
 
 def initialize_drpc_subcommand(subparsers):
@@ -1061,7 +1069,7 @@
     extrajvmopts = ["-Dstorm.local.sleeptime=" + args.local_ttl]
     if args.java_debug:
         extrajvmopts += ["-agentlib:jdwp=" + args.java_debug]
-    args.topology_main_args = [args.topology_main_class] + args.topology_main_args
+    args.main_args = [args.topology_main_class] + args.main_args
     run_client_jar(
         "org.apache.storm.LocalCluster", args,
         client=False, daemon=False, extrajvmopts=extrajvmopts)
@@ -1101,23 +1109,28 @@
         args=sql_args,
         daemon=False,
         jvmopts=["-Dstorm.dependency.jars=" + ",".join(local_jars)] +
-                ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)])
+                ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)],
+        overriding_conf_file=args.config)
 
 
 def kill(args):
     exec_storm_class(
         "org.apache.storm.command.KillTopology",
-        args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def upload_credentials(args):
+    if (len(args.cred_list) % 2 != 0):
+        raise argparse.ArgumentTypeError("please provide a list of cred key and value pairs " + cred_list)
     exec_storm_class(
         "org.apache.storm.command.UploadCredentials",
-        args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def blob(args):
@@ -1125,32 +1138,36 @@
         raise argparse.ArgumentTypeError("Replication factor needed when doing blob update")
     exec_storm_class(
         "org.apache.storm.command.Blobstore",
-        args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def heartbeats(args):
     exec_storm_class(
         "org.apache.storm.command.Heartbeats",
-        args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def activate(args):
     exec_storm_class(
         "org.apache.storm.command.Activate",
-        args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 def listtopos(args):
     exec_storm_class(
         "org.apache.storm.command.ListTopologies",
-        args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 def set_log_level(args):
     for log_level in args.l:
@@ -1163,16 +1180,18 @@
             raise argparse.ArgumentTypeError("Should be in the form[logger name]=[log level][:optional timeout]")
     exec_storm_class(
         "org.apache.storm.command.SetLogLevel",
-        args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 def deactivate(args):
     exec_storm_class(
         "org.apache.storm.command.Deactivate",
-        args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def rebalance(args):
@@ -1186,41 +1205,46 @@
             raise argparse.ArgumentTypeError("Should be in the form component_name:new_executor_count")
     exec_storm_class(
         "org.apache.storm.command.Rebalance",
-        args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def get_errors(args):
     exec_storm_class(
         "org.apache.storm.command.GetErrors",
-        args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def healthcheck(args):
     exec_storm_class(
         "org.apache.storm.command.HealthCheck",
-        args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def kill_workers(args):
     exec_storm_class(
         "org.apache.storm.command.KillWorkers",
-        args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def admin(args):
     exec_storm_class(
         "org.apache.storm.command.AdminCommands",
-        args=sys.argv[2:], storm_config_opts=args.storm_config_opts,
+        args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def shell(args):
@@ -1233,21 +1257,24 @@
         args=runnerargs,
         jvmtype="-client",
         extrajars=[USER_CONF_DIR],
-        fork=True)
+        fork=True,
+        overriding_conf_file=args.config)
     os.system("rm " + tmpjarpath)
 
 
 def repl(args):
     cppaths = [CLUSTER_CONF_DIR]
     exec_storm_class(
-        "clojure.main", storm_config_opts=args.storm_config_opts, jvmtype="-client", extrajars=cppaths
+        "clojure.main", storm_config_opts=args.storm_config_opts, jvmtype="-client", extrajars=cppaths,
+        overriding_conf_file=args.config
     )
 
 
-def get_log4j2_conf_dir(storm_config_opts):
+def get_log4j2_conf_dir(storm_config_opts, args):
     cppaths = [CLUSTER_CONF_DIR]
     storm_log4j2_conf_dir = confvalue(
-        "storm.log4j2.conf.dir", storm_config_opts=storm_config_opts, extrapaths=cppaths
+        "storm.log4j2.conf.dir", storm_config_opts=storm_config_opts,
+        extrapaths=cppaths, overriding_conf_file=args.config
     )
     if(not storm_log4j2_conf_dir or storm_log4j2_conf_dir == "null"):
         storm_log4j2_conf_dir = STORM_LOG4J2_CONF_DIR
@@ -1260,18 +1287,19 @@
     cppaths = [CLUSTER_CONF_DIR]
     storm_config_opts = get_config_opts(args.storm_config_opts)
     jvmopts = shlex.split(confvalue(
-        "nimbus.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths
+        "nimbus.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths, overriding_conf_file=args.config
         )) + [
             "-Djava.deserialization.disabled=true",
             "-Dlogfile.name=nimbus.log",
-            "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml"),
+            "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml"),
         ]
     exec_storm_class(
         "org.apache.storm.daemon.nimbus.Nimbus", storm_config_opts=args.storm_config_opts,
         jvmtype="-server",
         daemonName="nimbus",
         extrajars=cppaths,
-        jvmopts=jvmopts)
+        jvmopts=jvmopts,
+        overriding_conf_file=args.config)
 
 
 def pacemaker(args):
@@ -1279,47 +1307,51 @@
     storm_config_opts = get_config_opts(args.storm_config_opts)
 
     jvmopts = shlex.split(confvalue(
-        "pacemaker.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths)
+        "pacemaker.childopts", storm_config_opts=storm_config_opts,
+        extrapaths=cppaths, overriding_conf_file=args.config)
     ) + [
         "-Djava.deserialization.disabled=true",
         "-Dlogfile.name=pacemaker.log",
-        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml"),
+        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml"),
         ]
     exec_storm_class(
         "org.apache.storm.pacemaker.Pacemaker", storm_config_opts=args.storm_config_opts,
         jvmtype="-server",
         daemonName="pacemaker",
         extrajars=cppaths,
-        jvmopts=jvmopts)
+        jvmopts=jvmopts,
+        overriding_conf_file=args.config)
 
 
 def supervisor(args):
     cppaths = [CLUSTER_CONF_DIR]
     storm_config_opts = get_config_opts(args.storm_config_opts)
     jvmopts = shlex.split(confvalue(
-        "supervisor.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths)
+        "supervisor.childopts", storm_config_opts=storm_config_opts,
+        extrapaths=cppaths, overriding_conf_file=args.config)
     ) + [
         "-Djava.deserialization.disabled=true",
         "-Dlogfile.name=" + STORM_SUPERVISOR_LOG_FILE,
-        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml"),
+        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml"),
         ]
     exec_storm_class(
         "org.apache.storm.daemon.supervisor.Supervisor", storm_config_opts=args.storm_config_opts,
         jvmtype="-server",
         daemonName="supervisor",
         extrajars=cppaths,
-        jvmopts=jvmopts)
+        jvmopts=jvmopts,
+        overriding_conf_file=args.config)
 
 
 def ui(args):
     cppaths = [CLUSTER_CONF_DIR]
     storm_config_opts = get_config_opts(args.storm_config_opts)
     jvmopts = shlex.split(confvalue(
-        "ui.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths)
+        "ui.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths, overriding_conf_file=args.config)
     ) + [
         "-Djava.deserialization.disabled=true",
         "-Dlogfile.name=ui.log",
-        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml")
+        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml")
     ]
 
     allextrajars = get_wildcard_dir(STORM_WEBAPP_LIB_DIR)
@@ -1329,7 +1361,8 @@
         jvmtype="-server",
         daemonName="ui",
         jvmopts=jvmopts,
-        extrajars=allextrajars)
+        extrajars=allextrajars,
+        overriding_conf_file=args.config)
 
 
 def logviewer(args):
@@ -1337,12 +1370,13 @@
     storm_config_opts = get_config_opts(args.storm_config_opts)
     jvmopts = shlex.split(
         confvalue(
-            "logviewer.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths
+            "logviewer.childopts", storm_config_opts=storm_config_opts,
+            extrapaths=cppaths, overriding_conf_file=args.config
         )
     ) + [
         "-Djava.deserialization.disabled=true",
         "-Dlogfile.name=logviewer.log",
-        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml")
+        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml")
     ]
 
     allextrajars = get_wildcard_dir(STORM_WEBAPP_LIB_DIR)
@@ -1352,7 +1386,8 @@
         jvmtype="-server",
         daemonName="logviewer",
         jvmopts=jvmopts,
-        extrajars=allextrajars)
+        extrajars=allextrajars,
+        overriding_conf_file=args.config)
 
 
 def drpc_client(args):
@@ -1366,9 +1401,10 @@
 
     exec_storm_class(
         "org.apache.storm.command.BasicDrpcClient",
-        args=sys.argv[2],
+        args=remove_common_options(sys.argv[2:]),
         jvmtype="-client",
-        extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+        extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
+        overriding_conf_file=args.config)
 
 
 def drpc(args):
@@ -1376,12 +1412,12 @@
     storm_config_opts = get_config_opts(args.storm_config_opts)
     jvmopts = shlex.split(
         confvalue(
-            "drpc.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths
+            "drpc.childopts", storm_config_opts=storm_config_opts, extrapaths=cppaths, overriding_conf_file=args.config
         )
     ) + [
         "-Djava.deserialization.disabled=true",
         "-Dlogfile.name=drpc.log",
-        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml")
+        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml")
     ]
     allextrajars = get_wildcard_dir(STORM_WEBAPP_LIB_DIR)
     allextrajars.append(CLUSTER_CONF_DIR)
@@ -1390,28 +1426,31 @@
         jvmtype="-server",
         daemonName="drpc",
         jvmopts=jvmopts,
-        extrajars=allextrajars)
+        extrajars=allextrajars,
+        overriding_conf_file=args.config)
 
 
 def dev_zookeeper(args):
     storm_config_opts = get_config_opts(args.storm_config_opts)
     jvmopts = [
         "-Dlogfile.name=dev-zookeeper.log",
-        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts), "cluster.xml")
+        "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(storm_config_opts, args), "cluster.xml")
     ]
     exec_storm_class(
         "org.apache.storm.command.DevZookeeper", storm_config_opts=args.storm_config_opts,
         jvmtype="-server",
         daemonName="dev_zookeeper",
         jvmopts=jvmopts,
-        extrajars=[CLUSTER_CONF_DIR])
+        extrajars=[CLUSTER_CONF_DIR],
+        overriding_conf_file=args.config)
 
 
 def version(args):
     exec_storm_class(
         "org.apache.storm.utils.VersionInfo", storm_config_opts=args.storm_config_opts,
         jvmtype="-client",
-        extrajars=[CLUSTER_CONF_DIR])
+        extrajars=[CLUSTER_CONF_DIR],
+        overriding_conf_file=args.config)
 
 
 def print_classpath(args):
@@ -1425,7 +1464,7 @@
 def monitor(args):
     exec_storm_class(
         "org.apache.storm.command.Monitor", storm_config_opts=args.storm_config_opts,
-        args=sys.argv[2],
+        args=remove_common_options(sys.argv[2:]),
         jvmtype="-client",
         extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
 
diff --git a/storm-client/test/py/test_storm_cli.py b/storm-client/test/py/test_storm_cli.py
index 1069a6b..8084452 100644
--- a/storm-client/test/py/test_storm_cli.py
+++ b/storm-client/test/py/test_storm_cli.py
@@ -42,10 +42,11 @@
         )
 
     def base_test(self, command_invocation, mock_shell_interface, expected_output):
+        print(command_invocation)
         with mock.patch.object(sys, "argv", command_invocation):
             self.cli_main()
         if expected_output not in mock_shell_interface.call_args_list:
-            print("Expected:"  + str(expected_output))
+            print("Expected:" + str(expected_output))
             print("Got:" + str(mock_shell_interface.call_args_list[-1]))
         assert expected_output in mock_shell_interface.call_args_list
 
@@ -59,7 +60,7 @@
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client','-Ddaemon.name=', '-Dstorm.options=+topology.blobstore.map%3D%27%7B%22key1%22%3A%7B%22localname%22%3A%22blob_file%22%2C+%22uncompress%22%3Afalse%7D%2C%22key2%22%3A%7B%7D%7D%27',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:example/storm-starter/storm-starter-topologies-*.jar:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin:./external/storm-redis/storm-redis-1.1.0.jar:./external/storm-kafka-client/storm-kafka-client-1.1.0.jar"', '-Dstorm.jar=example/storm-starter/storm-starter-topologies-*.jar', '-Dstorm.dependency.jars=./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka-client/storm-kafka-client-1.1.0.jar"', '-Dstorm.dependency.artifacts={}',
                 'org.apache.storm.starter.RollingTopWords', 'blobstore-remote2', 'remote'
@@ -99,7 +100,7 @@
             ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client','-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:example/storm-starter/storm-starter-topologies-*.jar:' + self.storm_dir +
@@ -120,7 +121,7 @@
             ], self.mock_execvp, mock.call(
                 self.java_cmd,
                 [self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                 '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                 '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                  '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                  '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' +
                  self.storm_dir +
@@ -137,7 +138,7 @@
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir +
                 '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin', 'org.apache.storm.command.KillTopology', 'doomed_topology'
@@ -146,17 +147,18 @@
 
     def test_upload_credentials_command(self):
         self.base_test([
-            'storm', 'upload-credentials', 'my-topology-name', 'appids role.name1,role.name2"'
+            'storm', 'upload-credentials', '--config', '/some/other/storm.yaml', '-c', 'test=test', 'my-topology-name', 'appids', 'role.name1,role.name2'
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
-                self.java_cmd,  '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=', '-Djava.library.path=',
-                '-Dstorm.conf.file=', '-cp', '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' +
+                self.java_cmd,  '-client', '-Ddaemon.name=', '-Dstorm.options=test%3Dtest',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
+                '-Djava.library.path=', '-Dstorm.conf.file=/some/other/storm.yaml',
+                '-cp', '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' +
                                              self.storm_dir +
                                              '/extlib:' + self.storm_dir + '/extlib-daemon:' +
                                              self.storm_dir + '/conf:' + self.storm_dir +
                                              '/bin', 'org.apache.storm.command.UploadCredentials',
-                'my-topology-name', 'appids role.name1,role.name2"'])
+                'my-topology-name', 'appids', 'role.name1,role.name2'])
         )
 
     def test_blobstore_command(self):
@@ -165,7 +167,7 @@
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' +
@@ -180,7 +182,7 @@
             self.java_cmd, [
                 self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
                 '-Dstorm.home=' + self.storm_dir + '',
-                '-Dstorm.log.dir=', '-Djava.library.path=',
+                '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=',
                 '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' +
@@ -193,7 +195,7 @@
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' +
@@ -207,7 +209,7 @@
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' +
@@ -220,7 +222,7 @@
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir +
                 '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir + '/bin',
@@ -234,7 +236,7 @@
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir +
                 '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir +
@@ -248,7 +250,7 @@
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir +
                 '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir +
@@ -262,7 +264,7 @@
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir + '/extlib:' + self.storm_dir +
                 '/extlib-daemon:' + self.storm_dir + '/conf:' + self.storm_dir +
@@ -276,7 +278,7 @@
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-server', '-Ddaemon.name=nimbus', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf',
@@ -292,7 +294,7 @@
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-server', '-Ddaemon.name=supervisor', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf',
@@ -308,7 +310,7 @@
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-server', '-Ddaemon.name=pacemaker', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf',
@@ -324,7 +326,7 @@
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-server', '-Ddaemon.name=ui', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir +
@@ -341,7 +343,7 @@
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-server', '-Ddaemon.name=logviewer', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir +
@@ -358,7 +360,7 @@
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-server', '-Ddaemon.name=drpc', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs",
                 '-Djava.library.path=', '-Dstorm.conf.file=', '-cp',
                 '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' + self.storm_dir +
                 '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir +
@@ -375,7 +377,7 @@
         ], self.mock_execvp, mock.call(
             self.java_cmd, [
                 self.java_cmd, '-client', '-Ddaemon.name=', '-Dstorm.options=',
-                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=', '-Djava.library.path=',
+                '-Dstorm.home=' + self.storm_dir + '', '-Dstorm.log.dir=' + self.storm_dir + "/logs", '-Djava.library.path=',
                 '-Dstorm.conf.file=', '-cp', '' + self.storm_dir + '/*:' + self.storm_dir + '/lib:' +
                 self.storm_dir + '/extlib:' + self.storm_dir + '/extlib-daemon:' + self.storm_dir + '/conf:' +
                 self.storm_dir + '/bin', 'org.apache.storm.command.HealthCheck'