[STORM-3905] Replace anonymous class with lambda in storm-core commands (#3530)
* [STORM-3903] Bump commons-fileupload from 1.3.3 to 1.5
* [STORM-3905] Replace anonymous innner class in storm-core with lambda where possible.
* [STORM-3905] Remove unuxed imports and fix LOF param.
* [STORM-3905] Undo change for commons.fileupload
diff --git a/storm-core/src/jvm/org/apache/storm/command/Activate.java b/storm-core/src/jvm/org/apache/storm/command/Activate.java
index 73ad4f5..b619da0 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Activate.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Activate.java
@@ -12,7 +12,6 @@
package org.apache.storm.command;
-import org.apache.storm.generated.Nimbus;
import org.apache.storm.utils.NimbusClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -23,12 +22,9 @@
public static void main(String[] args) throws Exception {
final String name = args[0];
- NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
- @Override
- public void run(Nimbus.Iface nimbus) throws Exception {
- nimbus.activate(name);
- LOG.info("Activated topology: {}", name);
- }
+ NimbusClient.withConfiguredClient(nimbus -> {
+ nimbus.activate(name);
+ LOG.info("Activated topology: {}", name);
});
}
}
diff --git a/storm-core/src/jvm/org/apache/storm/command/BasicDrpcClient.java b/storm-core/src/jvm/org/apache/storm/command/BasicDrpcClient.java
index 9999b37..f995d93 100644
--- a/storm-core/src/jvm/org/apache/storm/command/BasicDrpcClient.java
+++ b/storm-core/src/jvm/org/apache/storm/command/BasicDrpcClient.java
@@ -27,7 +27,7 @@
private static void runAndPrint(DRPCClient drpc, String func, String arg) throws Exception {
String result = drpc.execute(func, arg);
- System.out.println(String.format("%s \"%s\" => \"%s\"", func, arg, result));
+ System.out.printf("%s \"%s\" => \"%s\"%n", func, arg, result);
}
/**
diff --git a/storm-core/src/jvm/org/apache/storm/command/Blobstore.java b/storm-core/src/jvm/org/apache/storm/command/Blobstore.java
index d5be3c8..186e46d 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Blobstore.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Blobstore.java
@@ -151,47 +151,40 @@
}
private static void deleteCli(final String[] args) throws Exception {
- ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
- @Override
- public void run(ClientBlobStore blobStore) throws Exception {
- for (String key : args) {
- blobStore.deleteBlob(key);
-
- LOG.info("deleted {}", key);
- }
+ ClientBlobStore.withConfiguredClient(blobStore -> {
+ for (String key : args) {
+ blobStore.deleteBlob(key);
+ LOG.info("deleted {}", key);
}
});
}
private static void listCli(final String[] args) throws Exception {
- ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
- @Override
- public void run(ClientBlobStore blobStore) throws Exception {
- Iterator<String> keys;
- boolean isArgsEmpty = (args == null || args.length == 0);
- if (isArgsEmpty) {
- keys = blobStore.listKeys();
- } else {
- keys = Arrays.asList(args).iterator();
- }
+ ClientBlobStore.withConfiguredClient(blobStore -> {
+ Iterator<String> keys;
+ boolean isArgsEmpty = (args == null || args.length == 0);
+ if (isArgsEmpty) {
+ keys = blobStore.listKeys();
+ } else {
+ keys = Arrays.asList(args).iterator();
+ }
- while (keys.hasNext()) {
- String key = keys.next();
+ while (keys.hasNext()) {
+ String key = keys.next();
- try {
- ReadableBlobMeta meta = blobStore.getBlobMeta(key);
- long version = meta.get_version();
- List<AccessControl> acl = meta.get_settable().get_acl();
+ try {
+ ReadableBlobMeta meta = blobStore.getBlobMeta(key);
+ long version = meta.get_version();
+ List<AccessControl> acl = meta.get_settable().get_acl();
- LOG.info("{} {} {}", key, version, generateAccessControlsInfo(acl));
- } catch (AuthorizationException ae) {
- if (!isArgsEmpty) {
- LOG.error("ACCESS DENIED to key: {}", key);
- }
- } catch (KeyNotFoundException knf) {
- if (!isArgsEmpty) {
- LOG.error("{} NOT FOUND", key);
- }
+ LOG.info("{} {} {}", key, version, generateAccessControlsInfo(acl));
+ } catch (AuthorizationException ae) {
+ if (!isArgsEmpty) {
+ LOG.error("ACCESS DENIED to key: {}", key);
+ }
+ } catch (KeyNotFoundException knf) {
+ if (!isArgsEmpty) {
+ LOG.error("{} NOT FOUND", key);
}
}
}
@@ -206,22 +199,19 @@
final String key = (String) cl.get("key");
final List<AccessControl> setAcl = (List<AccessControl>) cl.get("s");
- ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
- @Override
- public void run(ClientBlobStore blobStore) throws Exception {
- ReadableBlobMeta meta = blobStore.getBlobMeta(key);
- List<AccessControl> acl = meta.get_settable().get_acl();
- List<AccessControl> newAcl;
- if (setAcl != null && !setAcl.isEmpty()) {
- newAcl = setAcl;
- } else {
- newAcl = acl;
- }
-
- SettableBlobMeta newMeta = new SettableBlobMeta(newAcl);
- LOG.info("Setting ACL for {} to {}", key, generateAccessControlsInfo(newAcl));
- blobStore.setBlobMeta(key, newMeta);
+ ClientBlobStore.withConfiguredClient(blobStore -> {
+ ReadableBlobMeta meta = blobStore.getBlobMeta(key);
+ List<AccessControl> acl = meta.get_settable().get_acl();
+ List<AccessControl> newAcl;
+ if (setAcl != null && !setAcl.isEmpty()) {
+ newAcl = setAcl;
+ } else {
+ newAcl = acl;
}
+
+ SettableBlobMeta newMeta = new SettableBlobMeta(newAcl);
+ LOG.info("Setting ACL for {} to {}", key, generateAccessControlsInfo(newAcl));
+ blobStore.setBlobMeta(key, newMeta);
});
}
@@ -283,33 +273,24 @@
private static final class BlobStoreSupport {
static void readBlob(final String key, final OutputStream os) throws Exception {
- ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
- @Override
- public void run(ClientBlobStore blobStore) throws Exception {
- try (InputStreamWithMeta is = blobStore.getBlob(key)) {
- IOUtils.copy(is, os);
- }
+ ClientBlobStore.withConfiguredClient(blobStore -> {
+ try (InputStreamWithMeta is = blobStore.getBlob(key)) {
+ IOUtils.copy(is, os);
}
});
}
static void createBlobFromStream(final String key, final InputStream is, final SettableBlobMeta meta) throws Exception {
- ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
- @Override
- public void run(ClientBlobStore blobStore) throws Exception {
- AtomicOutputStream os = blobStore.createBlob(key, meta);
- copyInputStreamToBlobOutputStream(is, os);
- }
+ ClientBlobStore.withConfiguredClient(blobStore -> {
+ AtomicOutputStream os = blobStore.createBlob(key, meta);
+ copyInputStreamToBlobOutputStream(is, os);
});
}
static void updateBlobFromStream(final String key, final InputStream is) throws Exception {
- ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
- @Override
- public void run(ClientBlobStore blobStore) throws Exception {
- AtomicOutputStream os = blobStore.updateBlob(key);
- copyInputStreamToBlobOutputStream(is, os);
- }
+ ClientBlobStore.withConfiguredClient(blobStore -> {
+ AtomicOutputStream os = blobStore.updateBlob(key);
+ copyInputStreamToBlobOutputStream(is, os);
});
}
diff --git a/storm-core/src/jvm/org/apache/storm/command/CLI.java b/storm-core/src/jvm/org/apache/storm/command/CLI.java
index d9f3fe1..d111283 100644
--- a/storm-core/src/jvm/org/apache/storm/command/CLI.java
+++ b/storm-core/src/jvm/org/apache/storm/command/CLI.java
@@ -28,7 +28,7 @@
/**
* Parse function to return an Integer.
*/
- public static final Parse AS_INT = value -> Integer.valueOf(value);
+ public static final Parse AS_INT = Integer::valueOf;
/**
* Noop parse function, returns the String.
diff --git a/storm-core/src/jvm/org/apache/storm/command/Deactivate.java b/storm-core/src/jvm/org/apache/storm/command/Deactivate.java
index d56f6ff..b8bbf59 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Deactivate.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Deactivate.java
@@ -12,7 +12,6 @@
package org.apache.storm.command;
-import org.apache.storm.generated.Nimbus;
import org.apache.storm.utils.NimbusClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -23,12 +22,9 @@
public static void main(String[] args) throws Exception {
final String name = args[0];
- NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
- @Override
- public void run(Nimbus.Iface nimbus) throws Exception {
- nimbus.deactivate(name);
- LOG.info("Deactivated topology: {}", name);
- }
+ NimbusClient.withConfiguredClient(nimbus -> {
+ nimbus.deactivate(name);
+ LOG.info("Deactivated topology: {}", name);
});
}
}
diff --git a/storm-core/src/jvm/org/apache/storm/command/GetErrors.java b/storm-core/src/jvm/org/apache/storm/command/GetErrors.java
index c45272f..7a51889 100644
--- a/storm-core/src/jvm/org/apache/storm/command/GetErrors.java
+++ b/storm-core/src/jvm/org/apache/storm/command/GetErrors.java
@@ -22,7 +22,6 @@
import org.apache.storm.generated.NumErrorsChoice;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.utils.NimbusClient;
-import org.apache.storm.utils.Utils;
import org.json.simple.JSONValue;
public class GetErrors {
diff --git a/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java b/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
index 4236d4d..22e0f95 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
@@ -56,7 +56,7 @@
try {
cluster.close();
} catch (Exception e) {
- LOG.info("Caught exception: {} on close.", e);
+ LOG.info("Caught exception: {} on close.", e.getMessage(), e);
}
// force process to be terminated
diff --git a/storm-core/src/jvm/org/apache/storm/command/KillTopology.java b/storm-core/src/jvm/org/apache/storm/command/KillTopology.java
index ef5bfff..c193ab0 100644
--- a/storm-core/src/jvm/org/apache/storm/command/KillTopology.java
+++ b/storm-core/src/jvm/org/apache/storm/command/KillTopology.java
@@ -16,7 +16,6 @@
import java.util.List;
import java.util.Map;
import org.apache.storm.generated.KillOptions;
-import org.apache.storm.generated.Nimbus;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
@@ -67,29 +66,24 @@
opts.set_wait_secs(wait);
}
- NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
- @Override
- public void run(Nimbus.Iface nimbus) throws Exception {
- for (String name : names) {
- try {
- nimbus.killTopologyWithOpts(name, opts);
- LOG.info("Killed topology: {}", name);
- } catch (Exception e) {
- errorCount += 1;
- if (!continueOnError) {
- throw e;
- } else {
- LOG.error(
- "Caught error killing topology '{}'; continuing as -i was passed.", name, e
- );
- }
+ NimbusClient.withConfiguredClient(nimbus -> {
+ for (String name : names) {
+ try {
+ nimbus.killTopologyWithOpts(name, opts);
+ LOG.info("Killed topology: {}", name);
+ } catch (Exception e) {
+ errorCount += 1;
+ if (continueOnError) {
+ LOG.error("Caught error killing topology '{}'; continuing as -i was passed.", name, e);
+ } else {
+ throw e;
}
}
+ }
- // If we failed to kill any topology, still exit with failure status
- if (errorCount > 0) {
- throw new RuntimeException("Failed to successfully kill " + errorCount + " topologies.");
- }
+ // If we failed to kill any topology, still exit with failure status
+ if (errorCount > 0) {
+ throw new RuntimeException("Failed to successfully kill " + errorCount + " topologies.");
}
});
}
diff --git a/storm-core/src/jvm/org/apache/storm/command/ListTopologies.java b/storm-core/src/jvm/org/apache/storm/command/ListTopologies.java
index 0557873..b25ddd8 100644
--- a/storm-core/src/jvm/org/apache/storm/command/ListTopologies.java
+++ b/storm-core/src/jvm/org/apache/storm/command/ListTopologies.java
@@ -13,7 +13,6 @@
package org.apache.storm.command;
import java.util.List;
-import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.TopologySummary;
import org.apache.storm.utils.NimbusClient;
import org.slf4j.Logger;
@@ -24,28 +23,25 @@
private static final String MSG_FORMAT = "%-20s %-10s %-10s %-12s %-12s %-20s %-20s\n";
public static void main(String[] args) throws Exception {
- NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
- @Override
- public void run(Nimbus.Iface nimbus) throws Exception {
- List<TopologySummary> topologies = nimbus.getTopologySummaries();
- if (topologies == null || topologies.isEmpty()) {
- System.out.println("No topologies running.");
- } else {
- System.out.printf(MSG_FORMAT,
- "Topology_name",
- "Status",
- "Num_tasks",
- "Num_workers",
- "Uptime_secs",
- "Topology_Id",
- "Owner");
- System.out.println("----------------------------------------------------------------------------------------");
- for (TopologySummary topology : topologies) {
- System.out.printf(MSG_FORMAT, topology.get_name(), topology.get_status(),
- topology.get_num_tasks(), topology.get_num_workers(),
- topology.get_uptime_secs(), topology.get_id(),
- topology.get_owner());
- }
+ NimbusClient.withConfiguredClient(nimbus -> {
+ List<TopologySummary> topologies = nimbus.getTopologySummaries();
+ if (topologies == null || topologies.isEmpty()) {
+ System.out.println("No topologies running.");
+ } else {
+ System.out.printf(MSG_FORMAT,
+ "Topology_name",
+ "Status",
+ "Num_tasks",
+ "Num_workers",
+ "Uptime_secs",
+ "Topology_Id",
+ "Owner");
+ System.out.println("----------------------------------------------------------------------------------------");
+ for (TopologySummary topology : topologies) {
+ System.out.printf(MSG_FORMAT, topology.get_name(), topology.get_status(),
+ topology.get_num_tasks(), topology.get_num_workers(),
+ topology.get_uptime_secs(), topology.get_id(),
+ topology.get_owner());
}
}
});
diff --git a/storm-core/src/jvm/org/apache/storm/command/Monitor.java b/storm-core/src/jvm/org/apache/storm/command/Monitor.java
index 6fa69de..9e6972b 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Monitor.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Monitor.java
@@ -13,7 +13,6 @@
package org.apache.storm.command;
import java.util.Map;
-import org.apache.storm.generated.Nimbus;
import org.apache.storm.utils.NimbusClient;
public class Monitor {
@@ -47,11 +46,6 @@
monitor.setTopology(topologyName);
}
- NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
- @Override
- public void run(Nimbus.Iface nimbus) throws Exception {
- monitor.metrics(nimbus);
- }
- });
+ NimbusClient.withConfiguredClient(nimbus -> monitor.metrics(nimbus));
}
}
diff --git a/storm-core/src/jvm/org/apache/storm/command/Rebalance.java b/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
index 9fb5180..84207a3 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
@@ -16,7 +16,6 @@
import java.util.HashMap;
import java.util.Map;
-import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.RebalanceOptions;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.Utils;
@@ -73,12 +72,9 @@
rebalanceOptions.set_topology_conf_overrides(JSONValue.toJSONString(confOverrides));
}
- NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
- @Override
- public void run(Nimbus.Iface nimbus) throws Exception {
- nimbus.rebalance(name, rebalanceOptions);
- LOG.info("Topology {} is rebalancing", name);
- }
+ NimbusClient.withConfiguredClient(nimbus -> {
+ nimbus.rebalance(name, rebalanceOptions);
+ LOG.info("Topology {} is rebalancing", name);
});
}
diff --git a/storm-core/src/jvm/org/apache/storm/command/SetLogLevel.java b/storm-core/src/jvm/org/apache/storm/command/SetLogLevel.java
index e8e67cd..dcbecee 100644
--- a/storm-core/src/jvm/org/apache/storm/command/SetLogLevel.java
+++ b/storm-core/src/jvm/org/apache/storm/command/SetLogLevel.java
@@ -19,7 +19,6 @@
import org.apache.storm.generated.LogConfig;
import org.apache.storm.generated.LogLevel;
import org.apache.storm.generated.LogLevelAction;
-import org.apache.storm.generated.Nimbus;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
@@ -52,16 +51,13 @@
logConfig.put_to_named_logger_level(entry.getKey(), entry.getValue());
}
- NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
- @Override
- public void run(Nimbus.Iface nimbus) throws Exception {
- String topologyId = Utils.getTopologyId(topologyName, nimbus);
- if (null == topologyId) {
- throw new IllegalArgumentException(topologyName + " is not a running topology");
- }
- nimbus.setLogConfig(topologyId, logConfig);
- LOG.info("Log config {} is sent for topology {}", logConfig, topologyName);
+ NimbusClient.withConfiguredClient(nimbus -> {
+ String topologyId = Utils.getTopologyId(topologyName, nimbus);
+ if (null == topologyId) {
+ throw new IllegalArgumentException(topologyName + " is not a running topology");
}
+ nimbus.setLogConfig(topologyId, logConfig);
+ LOG.info("Log config {} is sent for topology {}", logConfig, topologyName);
});
}
diff --git a/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java b/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java
index bcfc398..06e3a3a 100644
--- a/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java
+++ b/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java
@@ -21,7 +21,6 @@
import java.util.Set;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.ClusterSummary;
import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.TopologySummary;
import org.apache.storm.utils.NimbusClient;