[STORM-3905] Replace anonymous class with lambda in storm-core commands (#3530)

* [STORM-3903] Bump commons-fileupload from 1.3.3 to 1.5

* [STORM-3905] Replace anonymous innner class in storm-core with lambda where possible.

* [STORM-3905] Remove unuxed imports and fix LOF param.

* [STORM-3905] Undo change for commons.fileupload
diff --git a/storm-core/src/jvm/org/apache/storm/command/Activate.java b/storm-core/src/jvm/org/apache/storm/command/Activate.java
index 73ad4f5..b619da0 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Activate.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Activate.java
@@ -12,7 +12,6 @@
 
 package org.apache.storm.command;
 
-import org.apache.storm.generated.Nimbus;
 import org.apache.storm.utils.NimbusClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -23,12 +22,9 @@
     public static void main(String[] args) throws Exception {
         final String name = args[0];
 
-        NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
-            @Override
-            public void run(Nimbus.Iface nimbus) throws Exception {
-                nimbus.activate(name);
-                LOG.info("Activated topology: {}", name);
-            }
+        NimbusClient.withConfiguredClient(nimbus -> {
+            nimbus.activate(name);
+            LOG.info("Activated topology: {}", name);
         });
     }
 }
diff --git a/storm-core/src/jvm/org/apache/storm/command/BasicDrpcClient.java b/storm-core/src/jvm/org/apache/storm/command/BasicDrpcClient.java
index 9999b37..f995d93 100644
--- a/storm-core/src/jvm/org/apache/storm/command/BasicDrpcClient.java
+++ b/storm-core/src/jvm/org/apache/storm/command/BasicDrpcClient.java
@@ -27,7 +27,7 @@
 
     private static void runAndPrint(DRPCClient drpc, String func, String arg) throws Exception {
         String result = drpc.execute(func, arg);
-        System.out.println(String.format("%s \"%s\" => \"%s\"", func, arg, result));
+        System.out.printf("%s \"%s\" => \"%s\"%n", func, arg, result);
     }
 
     /**
diff --git a/storm-core/src/jvm/org/apache/storm/command/Blobstore.java b/storm-core/src/jvm/org/apache/storm/command/Blobstore.java
index d5be3c8..186e46d 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Blobstore.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Blobstore.java
@@ -151,47 +151,40 @@
     }
 
     private static void deleteCli(final String[] args) throws Exception {
-        ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
-            @Override
-            public void run(ClientBlobStore blobStore) throws Exception {
-                for (String key : args) {
-                    blobStore.deleteBlob(key);
-
-                    LOG.info("deleted {}", key);
-                }
+        ClientBlobStore.withConfiguredClient(blobStore -> {
+            for (String key : args) {
+                blobStore.deleteBlob(key);
+                LOG.info("deleted {}", key);
             }
         });
     }
 
     private static void listCli(final String[] args) throws Exception {
-        ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
-            @Override
-            public void run(ClientBlobStore blobStore) throws Exception {
-                Iterator<String> keys;
-                boolean isArgsEmpty = (args == null || args.length == 0);
-                if (isArgsEmpty) {
-                    keys = blobStore.listKeys();
-                } else {
-                    keys = Arrays.asList(args).iterator();
-                }
+        ClientBlobStore.withConfiguredClient(blobStore -> {
+            Iterator<String> keys;
+            boolean isArgsEmpty = (args == null || args.length == 0);
+            if (isArgsEmpty) {
+                keys = blobStore.listKeys();
+            } else {
+                keys = Arrays.asList(args).iterator();
+            }
 
-                while (keys.hasNext()) {
-                    String key = keys.next();
+            while (keys.hasNext()) {
+                String key = keys.next();
 
-                    try {
-                        ReadableBlobMeta meta = blobStore.getBlobMeta(key);
-                        long version = meta.get_version();
-                        List<AccessControl> acl = meta.get_settable().get_acl();
+                try {
+                    ReadableBlobMeta meta = blobStore.getBlobMeta(key);
+                    long version = meta.get_version();
+                    List<AccessControl> acl = meta.get_settable().get_acl();
 
-                        LOG.info("{} {} {}", key, version, generateAccessControlsInfo(acl));
-                    } catch (AuthorizationException ae) {
-                        if (!isArgsEmpty) {
-                            LOG.error("ACCESS DENIED to key: {}", key);
-                        }
-                    } catch (KeyNotFoundException knf) {
-                        if (!isArgsEmpty) {
-                            LOG.error("{} NOT FOUND", key);
-                        }
+                    LOG.info("{} {} {}", key, version, generateAccessControlsInfo(acl));
+                } catch (AuthorizationException ae) {
+                    if (!isArgsEmpty) {
+                        LOG.error("ACCESS DENIED to key: {}", key);
+                    }
+                } catch (KeyNotFoundException knf) {
+                    if (!isArgsEmpty) {
+                        LOG.error("{} NOT FOUND", key);
                     }
                 }
             }
@@ -206,22 +199,19 @@
         final String key = (String) cl.get("key");
         final List<AccessControl> setAcl = (List<AccessControl>) cl.get("s");
 
-        ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
-            @Override
-            public void run(ClientBlobStore blobStore) throws Exception {
-                ReadableBlobMeta meta = blobStore.getBlobMeta(key);
-                List<AccessControl> acl = meta.get_settable().get_acl();
-                List<AccessControl> newAcl;
-                if (setAcl != null && !setAcl.isEmpty()) {
-                    newAcl = setAcl;
-                } else {
-                    newAcl = acl;
-                }
-
-                SettableBlobMeta newMeta = new SettableBlobMeta(newAcl);
-                LOG.info("Setting ACL for {} to {}", key, generateAccessControlsInfo(newAcl));
-                blobStore.setBlobMeta(key, newMeta);
+        ClientBlobStore.withConfiguredClient(blobStore -> {
+            ReadableBlobMeta meta = blobStore.getBlobMeta(key);
+            List<AccessControl> acl = meta.get_settable().get_acl();
+            List<AccessControl> newAcl;
+            if (setAcl != null && !setAcl.isEmpty()) {
+                newAcl = setAcl;
+            } else {
+                newAcl = acl;
             }
+
+            SettableBlobMeta newMeta = new SettableBlobMeta(newAcl);
+            LOG.info("Setting ACL for {} to {}", key, generateAccessControlsInfo(newAcl));
+            blobStore.setBlobMeta(key, newMeta);
         });
     }
 
@@ -283,33 +273,24 @@
 
     private static final class BlobStoreSupport {
         static void readBlob(final String key, final OutputStream os) throws Exception {
-            ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
-                @Override
-                public void run(ClientBlobStore blobStore) throws Exception {
-                    try (InputStreamWithMeta is = blobStore.getBlob(key)) {
-                        IOUtils.copy(is, os);
-                    }
+            ClientBlobStore.withConfiguredClient(blobStore -> {
+                try (InputStreamWithMeta is = blobStore.getBlob(key)) {
+                    IOUtils.copy(is, os);
                 }
             });
         }
 
         static void createBlobFromStream(final String key, final InputStream is, final SettableBlobMeta meta) throws Exception {
-            ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
-                @Override
-                public void run(ClientBlobStore blobStore) throws Exception {
-                    AtomicOutputStream os = blobStore.createBlob(key, meta);
-                    copyInputStreamToBlobOutputStream(is, os);
-                }
+            ClientBlobStore.withConfiguredClient(blobStore -> {
+                AtomicOutputStream os = blobStore.createBlob(key, meta);
+                copyInputStreamToBlobOutputStream(is, os);
             });
         }
 
         static void updateBlobFromStream(final String key, final InputStream is) throws Exception {
-            ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
-                @Override
-                public void run(ClientBlobStore blobStore) throws Exception {
-                    AtomicOutputStream os = blobStore.updateBlob(key);
-                    copyInputStreamToBlobOutputStream(is, os);
-                }
+            ClientBlobStore.withConfiguredClient(blobStore -> {
+                AtomicOutputStream os = blobStore.updateBlob(key);
+                copyInputStreamToBlobOutputStream(is, os);
             });
         }
 
diff --git a/storm-core/src/jvm/org/apache/storm/command/CLI.java b/storm-core/src/jvm/org/apache/storm/command/CLI.java
index d9f3fe1..d111283 100644
--- a/storm-core/src/jvm/org/apache/storm/command/CLI.java
+++ b/storm-core/src/jvm/org/apache/storm/command/CLI.java
@@ -28,7 +28,7 @@
     /**
      * Parse function to return an Integer.
      */
-    public static final Parse AS_INT = value -> Integer.valueOf(value);
+    public static final Parse AS_INT = Integer::valueOf;
 
     /**
      * Noop parse function, returns the String.
diff --git a/storm-core/src/jvm/org/apache/storm/command/Deactivate.java b/storm-core/src/jvm/org/apache/storm/command/Deactivate.java
index d56f6ff..b8bbf59 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Deactivate.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Deactivate.java
@@ -12,7 +12,6 @@
 
 package org.apache.storm.command;
 
-import org.apache.storm.generated.Nimbus;
 import org.apache.storm.utils.NimbusClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -23,12 +22,9 @@
     public static void main(String[] args) throws Exception {
         final String name = args[0];
 
-        NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
-            @Override
-            public void run(Nimbus.Iface nimbus) throws Exception {
-                nimbus.deactivate(name);
-                LOG.info("Deactivated topology: {}", name);
-            }
+        NimbusClient.withConfiguredClient(nimbus -> {
+            nimbus.deactivate(name);
+            LOG.info("Deactivated topology: {}", name);
         });
     }
 }
diff --git a/storm-core/src/jvm/org/apache/storm/command/GetErrors.java b/storm-core/src/jvm/org/apache/storm/command/GetErrors.java
index c45272f..7a51889 100644
--- a/storm-core/src/jvm/org/apache/storm/command/GetErrors.java
+++ b/storm-core/src/jvm/org/apache/storm/command/GetErrors.java
@@ -22,7 +22,6 @@
 import org.apache.storm.generated.NumErrorsChoice;
 import org.apache.storm.generated.TopologyInfo;
 import org.apache.storm.utils.NimbusClient;
-import org.apache.storm.utils.Utils;
 import org.json.simple.JSONValue;
 
 public class GetErrors {
diff --git a/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java b/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
index 4236d4d..22e0f95 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
@@ -56,7 +56,7 @@
         try {
             cluster.close();
         } catch (Exception e) {
-            LOG.info("Caught exception: {} on close.", e);
+            LOG.info("Caught exception: {} on close.", e.getMessage(), e);
         }
 
         // force process to be terminated
diff --git a/storm-core/src/jvm/org/apache/storm/command/KillTopology.java b/storm-core/src/jvm/org/apache/storm/command/KillTopology.java
index ef5bfff..c193ab0 100644
--- a/storm-core/src/jvm/org/apache/storm/command/KillTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/command/KillTopology.java
@@ -16,7 +16,6 @@
 import java.util.List;
 import java.util.Map;
 import org.apache.storm.generated.KillOptions;
-import org.apache.storm.generated.Nimbus;
 import org.apache.storm.utils.NimbusClient;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
@@ -67,29 +66,24 @@
             opts.set_wait_secs(wait);
         }
 
-        NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
-            @Override
-            public void run(Nimbus.Iface nimbus) throws Exception {
-                for (String name : names) {
-                    try {
-                        nimbus.killTopologyWithOpts(name, opts);
-                        LOG.info("Killed topology: {}", name);
-                    } catch (Exception e) {
-                        errorCount += 1;
-                        if (!continueOnError) {
-                            throw e;
-                        } else {
-                            LOG.error(
-                                    "Caught error killing topology '{}'; continuing as -i was passed.", name, e
-                            );
-                        }
+        NimbusClient.withConfiguredClient(nimbus -> {
+            for (String name : names) {
+                try {
+                    nimbus.killTopologyWithOpts(name, opts);
+                    LOG.info("Killed topology: {}", name);
+                } catch (Exception e) {
+                    errorCount += 1;
+                    if (continueOnError) {
+                        LOG.error("Caught error killing topology '{}'; continuing as -i was passed.", name, e);
+                    } else {
+                        throw e;
                     }
                 }
+            }
 
-                // If we failed to kill any topology, still exit with failure status
-                if (errorCount > 0) {
-                    throw new RuntimeException("Failed to successfully kill " + errorCount + " topologies.");
-                }
+            // If we failed to kill any topology, still exit with failure status
+            if (errorCount > 0) {
+                throw new RuntimeException("Failed to successfully kill " + errorCount + " topologies.");
             }
         });
     }
diff --git a/storm-core/src/jvm/org/apache/storm/command/ListTopologies.java b/storm-core/src/jvm/org/apache/storm/command/ListTopologies.java
index 0557873..b25ddd8 100644
--- a/storm-core/src/jvm/org/apache/storm/command/ListTopologies.java
+++ b/storm-core/src/jvm/org/apache/storm/command/ListTopologies.java
@@ -13,7 +13,6 @@
 package org.apache.storm.command;
 
 import java.util.List;
-import org.apache.storm.generated.Nimbus;
 import org.apache.storm.generated.TopologySummary;
 import org.apache.storm.utils.NimbusClient;
 import org.slf4j.Logger;
@@ -24,28 +23,25 @@
     private static final String MSG_FORMAT = "%-20s %-10s %-10s %-12s %-12s %-20s %-20s\n";
 
     public static void main(String[] args) throws Exception {
-        NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
-            @Override
-            public void run(Nimbus.Iface nimbus) throws Exception {
-                List<TopologySummary> topologies = nimbus.getTopologySummaries();
-                if (topologies == null || topologies.isEmpty()) {
-                    System.out.println("No topologies running.");
-                } else {
-                    System.out.printf(MSG_FORMAT,
-                            "Topology_name",
-                            "Status",
-                            "Num_tasks",
-                            "Num_workers",
-                            "Uptime_secs",
-                            "Topology_Id",
-                            "Owner");
-                    System.out.println("----------------------------------------------------------------------------------------");
-                    for (TopologySummary topology : topologies) {
-                        System.out.printf(MSG_FORMAT, topology.get_name(), topology.get_status(),
-                                          topology.get_num_tasks(), topology.get_num_workers(),
-                                          topology.get_uptime_secs(), topology.get_id(),
-                                          topology.get_owner());
-                    }
+        NimbusClient.withConfiguredClient(nimbus -> {
+            List<TopologySummary> topologies = nimbus.getTopologySummaries();
+            if (topologies == null || topologies.isEmpty()) {
+                System.out.println("No topologies running.");
+            } else {
+                System.out.printf(MSG_FORMAT,
+                        "Topology_name",
+                        "Status",
+                        "Num_tasks",
+                        "Num_workers",
+                        "Uptime_secs",
+                        "Topology_Id",
+                        "Owner");
+                System.out.println("----------------------------------------------------------------------------------------");
+                for (TopologySummary topology : topologies) {
+                    System.out.printf(MSG_FORMAT, topology.get_name(), topology.get_status(),
+                                      topology.get_num_tasks(), topology.get_num_workers(),
+                                      topology.get_uptime_secs(), topology.get_id(),
+                                      topology.get_owner());
                 }
             }
         });
diff --git a/storm-core/src/jvm/org/apache/storm/command/Monitor.java b/storm-core/src/jvm/org/apache/storm/command/Monitor.java
index 6fa69de..9e6972b 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Monitor.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Monitor.java
@@ -13,7 +13,6 @@
 package org.apache.storm.command;
 
 import java.util.Map;
-import org.apache.storm.generated.Nimbus;
 import org.apache.storm.utils.NimbusClient;
 
 public class Monitor {
@@ -47,11 +46,6 @@
             monitor.setTopology(topologyName);
         }
 
-        NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
-            @Override
-            public void run(Nimbus.Iface nimbus) throws Exception {
-                monitor.metrics(nimbus);
-            }
-        });
+        NimbusClient.withConfiguredClient(nimbus -> monitor.metrics(nimbus));
     }
 }
diff --git a/storm-core/src/jvm/org/apache/storm/command/Rebalance.java b/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
index 9fb5180..84207a3 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
@@ -16,7 +16,6 @@
 
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.storm.generated.Nimbus;
 import org.apache.storm.generated.RebalanceOptions;
 import org.apache.storm.utils.NimbusClient;
 import org.apache.storm.utils.Utils;
@@ -73,12 +72,9 @@
             rebalanceOptions.set_topology_conf_overrides(JSONValue.toJSONString(confOverrides));
         }
 
-        NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
-            @Override
-            public void run(Nimbus.Iface nimbus) throws Exception {
-                nimbus.rebalance(name, rebalanceOptions);
-                LOG.info("Topology {} is rebalancing", name);
-            }
+        NimbusClient.withConfiguredClient(nimbus -> {
+            nimbus.rebalance(name, rebalanceOptions);
+            LOG.info("Topology {} is rebalancing", name);
         });
     }
 
diff --git a/storm-core/src/jvm/org/apache/storm/command/SetLogLevel.java b/storm-core/src/jvm/org/apache/storm/command/SetLogLevel.java
index e8e67cd..dcbecee 100644
--- a/storm-core/src/jvm/org/apache/storm/command/SetLogLevel.java
+++ b/storm-core/src/jvm/org/apache/storm/command/SetLogLevel.java
@@ -19,7 +19,6 @@
 import org.apache.storm.generated.LogConfig;
 import org.apache.storm.generated.LogLevel;
 import org.apache.storm.generated.LogLevelAction;
-import org.apache.storm.generated.Nimbus;
 import org.apache.storm.utils.NimbusClient;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
@@ -52,16 +51,13 @@
             logConfig.put_to_named_logger_level(entry.getKey(), entry.getValue());
         }
 
-        NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
-            @Override
-            public void run(Nimbus.Iface nimbus) throws Exception {
-                String topologyId = Utils.getTopologyId(topologyName, nimbus);
-                if (null == topologyId) {
-                    throw new IllegalArgumentException(topologyName + " is not a running topology");
-                }
-                nimbus.setLogConfig(topologyId, logConfig);
-                LOG.info("Log config {} is sent for topology {}", logConfig, topologyName);
+        NimbusClient.withConfiguredClient(nimbus -> {
+            String topologyId = Utils.getTopologyId(topologyName, nimbus);
+            if (null == topologyId) {
+                throw new IllegalArgumentException(topologyName + " is not a running topology");
             }
+            nimbus.setLogConfig(topologyId, logConfig);
+            LOG.info("Log config {} is sent for topology {}", logConfig, topologyName);
         });
     }
 
diff --git a/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java b/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java
index bcfc398..06e3a3a 100644
--- a/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java
+++ b/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java
@@ -21,7 +21,6 @@
 import java.util.Set;
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.ClusterSummary;
 import org.apache.storm.generated.Nimbus;
 import org.apache.storm.generated.TopologySummary;
 import org.apache.storm.utils.NimbusClient;