[STORM-3257] 'storm kill' command line should be able to continue on error (#2878)
diff --git a/storm-core/src/jvm/org/apache/storm/command/KillTopology.java b/storm-core/src/jvm/org/apache/storm/command/KillTopology.java
index 696c199..e58c08b 100644
--- a/storm-core/src/jvm/org/apache/storm/command/KillTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/command/KillTopology.java
@@ -25,21 +25,47 @@
public static void main(String[] args) throws Exception {
Map<String, Object> cl = CLI.opt("w", "wait", null, CLI.AS_INT)
+ .boolOpt("i", "ignore-errors")
.arg("TOPO", CLI.INTO_LIST)
.parse(args);
+
+ @SuppressWarnings("unchecked")
final List<String> names = (List<String>) cl.get("TOPO");
+
+ // Wait this many seconds after deactivating topology before killing
Integer wait = (Integer) cl.get("w");
+ // if '-i' is set, we'll try to kill every topology listed, even if an error occurs
+ Boolean continueOnError = (Boolean) cl.get("i");
+
final KillOptions opts = new KillOptions();
if (wait != null) {
opts.set_wait_secs(wait);
}
+
NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
@Override
public void run(Nimbus.Iface nimbus) throws Exception {
+ int errorCount = 0;
for (String name : names) {
- nimbus.killTopologyWithOpts(name, opts);
- LOG.info("Killed topology: {}", name);
+ try {
+ nimbus.killTopologyWithOpts(name, opts);
+ LOG.info("Killed topology: {}", name);
+ } catch (Exception e) {
+ errorCount += 1;
+ if (!continueOnError) {
+ throw e;
+ } else {
+ LOG.error(
+ "Caught error killing topology '{}'; continuing as -i was passed.", name, e
+ );
+ }
+ }
+ }
+
+ // If we failed to kill any topology, still exit with failure status
+ if (errorCount > 0) {
+ throw new RuntimeException("Failed to successfully kill " + errorCount + " topologies.");
}
}
});