[STORM-3257] 'storm kill' command line should be able to continue on error (#2878)


diff --git a/storm-core/src/jvm/org/apache/storm/command/KillTopology.java b/storm-core/src/jvm/org/apache/storm/command/KillTopology.java
index 696c199..e58c08b 100644
--- a/storm-core/src/jvm/org/apache/storm/command/KillTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/command/KillTopology.java
@@ -25,21 +25,47 @@
 
     public static void main(String[] args) throws Exception {
         Map<String, Object> cl = CLI.opt("w", "wait", null, CLI.AS_INT)
+                                    .boolOpt("i", "ignore-errors")
                                     .arg("TOPO", CLI.INTO_LIST)
                                     .parse(args);
+
+        @SuppressWarnings("unchecked")
         final List<String> names = (List<String>) cl.get("TOPO");
+
+        // Wait this many seconds after deactivating topology before killing
         Integer wait = (Integer) cl.get("w");
 
+        // if '-i' is set, we'll try to kill every topology listed, even if an error occurs
+        Boolean continueOnError = (Boolean) cl.get("i");
+
         final KillOptions opts = new KillOptions();
         if (wait != null) {
             opts.set_wait_secs(wait);
         }
+
         NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
             @Override
             public void run(Nimbus.Iface nimbus) throws Exception {
+                int errorCount = 0;
                 for (String name : names) {
-                    nimbus.killTopologyWithOpts(name, opts);
-                    LOG.info("Killed topology: {}", name);
+                    try {
+                        nimbus.killTopologyWithOpts(name, opts);
+                        LOG.info("Killed topology: {}", name);
+                    } catch (Exception e) {
+                        errorCount += 1;
+                        if (!continueOnError) {
+                            throw e;
+                        } else {
+                            LOG.error(
+                                    "Caught error killing topology '{}'; continuing as -i was passed.", name, e
+                            );
+                        }
+                    }
+                }
+
+                // If we failed to kill any topology, still exit with failure status
+                if (errorCount > 0) {
+                    throw new RuntimeException("Failed to successfully kill " + errorCount + " topologies.");
                 }
             }
         });