Merge pull request #3214 from RuiLi8080/MINOR-Nimbus

[MINOR] remove some unused fields and correct comment typo
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 23e577e..b4e4c80 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -47,7 +47,6 @@
 import org.apache.storm.generated.ExecutorStats;
 import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.generated.LogConfig;
-import org.apache.storm.generated.Supervisor;
 import org.apache.storm.generated.SupervisorWorkerHeartbeat;
 import org.apache.storm.messaging.IConnection;
 import org.apache.storm.messaging.IContext;
@@ -75,6 +74,7 @@
     private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
     private static final Pattern BLOB_VERSION_EXTRACTION = Pattern.compile(".*\\.([0-9]+)$");
     private final Map<String, Object> conf;
+    private final Map<String, Object> topologyConf;
     private final IContext context;
     private final String topologyId;
     private final String assignmentId;
@@ -93,7 +93,6 @@
     private Collection<IAutoCredentials> autoCreds;
     private final Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier;
 
-
     /**
      * TODO: should worker even take the topologyId as input? this should be deducible from cluster state (by searching through assignments)
      * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency.
@@ -106,9 +105,9 @@
      * @param port           - port on which the worker runs
      * @param workerId       - worker id
      */
-
     public Worker(Map<String, Object> conf, IContext context, String topologyId, String assignmentId,
-                  int supervisorPort, int port, String workerId, Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier) {
+                  int supervisorPort, int port, String workerId, Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier)
+        throws IOException {
         this.conf = conf;
         this.context = context;
         this.topologyId = topologyId;
@@ -118,7 +117,25 @@
         this.workerId = workerId;
         this.logConfigManager = new LogConfigManager();
         this.metricRegistry = new StormMetricRegistry();
-        this.supervisorIfaceSupplier = supervisorIfaceSupplier;
+
+        this.topologyConf = ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
+
+        if (supervisorIfaceSupplier == null) {
+            this.supervisorIfaceSupplier = () -> {
+                try {
+                    return SupervisorClient.getConfiguredClient(topologyConf, Utils.hostname(), supervisorPort);
+                } catch (UnknownHostException e) {
+                    throw Utils.wrapInRuntime(e);
+                }
+            };
+        } else {
+            this.supervisorIfaceSupplier = supervisorIfaceSupplier;
+        }
+    }
+
+    public Worker(Map<String, Object> conf, IContext context, String topologyId, String assignmentId,
+                  int supervisorPort, int port, String workerId) throws IOException {
+        this(conf, context, topologyId, assignmentId, supervisorPort, port, workerId, null);
     }
 
     public static void main(String[] args) throws Exception {
@@ -132,15 +149,7 @@
         Utils.setupDefaultUncaughtExceptionHandler();
         StormCommon.validateDistributedMode(conf);
         int supervisorPortInt = Integer.parseInt(supervisorPort);
-        Supplier<SupervisorIfaceFactory> supervisorIfaceSuppler = () -> {
-            try {
-                return SupervisorClient.getConfiguredClient(conf, Utils.hostname(), supervisorPortInt);
-            } catch (UnknownHostException e) {
-                throw Utils.wrapInRuntime(e);
-            }
-        };
-        Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt,
-                                   Integer.parseInt(portStr), workerId, supervisorIfaceSuppler);
+        Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt, Integer.parseInt(portStr), workerId);
         worker.start();
         int workerShutdownSleepSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
         LOG.info("Adding shutdown hook with kill in {} secs", workerShutdownSleepSecs);
@@ -161,8 +170,7 @@
             FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
                                         Charset.forName("UTF-8"));
         }
-        final Map<String, Object> topologyConf =
-            ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
+
         ClusterStateContext csContext = new ClusterStateContext(DaemonType.WORKER, topologyConf);
         IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, topologyConf, csContext);
         IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext);
@@ -178,12 +186,12 @@
         subject = ClientAuthUtils.populateSubject(null, autoCreds, initCreds);
 
         Subject.doAs(subject, (PrivilegedExceptionAction<Object>)
-            () -> loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials)
+            () -> loadWorker(stateStorage, stormClusterState, initCreds, initialCredentials)
         );
 
     }
 
-    private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState,
+    private Object loadWorker(IStateStorage stateStorage, IStormClusterState stormClusterState,
                               Map<String, String> initCreds, Credentials initialCredentials)
         throws Exception {
         workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorIfaceSupplier, port, workerId,
@@ -221,7 +229,7 @@
 
         List<Executor> execs = new ArrayList<>();
         for (List<Long> e : workerState.getLocalExecutors()) {
-            if (ConfigUtils.isLocalMode(topologyConf)) {
+            if (ConfigUtils.isLocalMode(conf)) {
                 Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
                 execs.add(executor);
                 for (int i = 0; i < executor.getTaskIds().size(); ++i) {
@@ -443,7 +451,7 @@
         } catch (Exception tr1) {
             //If any error/exception thrown, report directly to nimbus.
             LOG.warn("Exception when send heartbeat to local supervisor", tr1.getMessage());
-            try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(conf)) {
+            try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(topologyConf)) {
                 nimbusClient.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat);
             } catch (Exception tr2) {
                 //if any error/exception thrown, just ignore.
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
index d6a345e..27b356a 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
@@ -34,7 +34,6 @@
 import javax.security.auth.login.LoginContext;
 import javax.security.auth.login.LoginException;
 import org.apache.log4j.Logger;
-import org.apache.storm.security.auth.ClientAuthUtils;
 import org.apache.storm.shade.org.apache.zookeeper.Shell;
 import org.apache.storm.shade.org.apache.zookeeper.client.ZooKeeperSaslClient;
 
@@ -296,8 +295,14 @@
                     + System.getProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client") + ")");
         }
         Configuration configuration = this.getConfiguration(jaasConfFile);
-        LoginContext loginContext = new LoginContext(loginContextName, null, callbackHandler, configuration);
-        loginContext.login();
+        LoginContext loginContext;
+        try {
+            loginContext = new LoginContext(loginContextName, null, callbackHandler, configuration);
+            loginContext.login();
+        } catch (LoginException e) {
+            LOG.error("Login using jaas conf " + jaasConfFile + " failed");
+            throw e;
+        }
         LOG.info("Successfully logged in to context " + loginContextName + " using " + jaasConfFile);
         return loginContext;
     }
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java b/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java
index 6a9b703..3db487d 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java
@@ -409,9 +409,17 @@
                 Set<Object> creds = subject.getPrivateCredentials();
                 synchronized (creds) {
                     WorkerToken previous = findWorkerToken(subject, type);
-                    creds.add(token);
-                    if (previous != null) {
-                        creds.remove(previous);
+                    boolean notAlreadyContained = creds.add(token);
+                    if (notAlreadyContained) {
+                        if (previous != null) {
+                            //this means token is not equal to previous so we should remove previous
+                            creds.remove(previous);
+                            LOG.info("Replaced WorkerToken for service type {}", type);
+                        } else {
+                            LOG.info("Added new WorkerToken for service type {}", type);
+                        }
+                    } else {
+                        LOG.info("The new WorkerToken for service type {} is the same as the previous token", type);
                     }
                 }
             }
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
index 6936555..668d4b3 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
@@ -172,6 +172,7 @@
 
     @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
     private void populateSubjectWithTGT(Subject subject, Map<String, String> credentials) {
+        LOG.info("Populating TGT from credentials");
         KerberosTicket tgt = getTGT(credentials);
         if (tgt != null) {
             clearCredentials(subject, tgt);
diff --git a/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java b/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
index ccb5feb..734d660 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
@@ -138,7 +138,7 @@
         try {
             curator.delete().forPath(path);
         } catch (KeeperException.NoNodeException nne) {
-            LOG.warn("Path {} already deleted.");
+            LOG.warn("Path {} already deleted.", path);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
diff --git a/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java b/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java
index c71a58c..564f68d 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java
@@ -26,6 +26,7 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.List;
@@ -144,7 +145,7 @@
                         ret = new VersionInfoImpl(info);
                         break;
                     } catch (IOException e) {
-                        LOG.error("Skipping {} get an error while trying to parse the file.", part, e);
+                        LOG.error("Skipping {}; got an error while trying to parse the file.", part, e);
                     }
                 }
             } else if (part.toLowerCase().endsWith(".jar")
@@ -164,10 +165,27 @@
                         }
                     }
                 } catch (IOException e) {
-                    LOG.error("Skipping {} get an error while trying to parse the jar file.", part, e);
+                    LOG.error("Skipping {}; got an error while trying to parse the jar file.", part, e);
+                }
+            } else if (p.endsWith("*")) {
+                //for a path like /<parent-path>/*
+                try {
+                    Path parent = p.getParent();
+                    List<String> children = new ArrayList<>();
+                    Files.list(parent)
+                        //avoid infinite recursion
+                        .filter(path -> !path.endsWith("*"))
+                        .forEach(path -> children.add(path.toString()));
+                    IVersionInfo resFromChildren = getFromClasspath(children, propFileName);
+                    if (resFromChildren != null) {
+                        ret = resFromChildren;
+                        break;
+                    }
+                } catch (NullPointerException | IOException e) {
+                    LOG.error("Skipping {}; got an error while trying to parse it", part, e);
                 }
             } else {
-                LOG.warn("Skipping {} don't know what to do with it.", part);
+                LOG.warn("Skipping {}; don't know what to do with it.", part);
             }
         }
         return ret;
diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
index f52df59..eb4ce7d 100644
--- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
+++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
@@ -79,7 +79,7 @@
                     throw new RuntimeException(e);
                 }
             }
-            LOG.debug("Will use {} for validation", ret);
+            LOG.info("Will use {} for validation", ret);
             configClasses = ret;
         }
         return configClasses;
diff --git a/storm-core/src/jvm/org/apache/storm/command/Rebalance.java b/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
index 92dcc7b..ffaefda 100644
--- a/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
+++ b/storm-core/src/jvm/org/apache/storm/command/Rebalance.java
@@ -20,6 +20,7 @@
 import org.apache.storm.generated.RebalanceOptions;
 import org.apache.storm.utils.NimbusClient;
 import org.apache.storm.utils.Utils;
+import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,6 +58,15 @@
         }
 
         Map<String, Object> confOverrides = (Map<String, Object>) cl.get("t");
+        Map<String, Object> jvmOpts = Utils.readCommandLineOpts(); // values in -Dstorm.options (originally -c in storm.py)
+        if (jvmOpts != null && !jvmOpts.isEmpty()) {
+            if (confOverrides == null) {
+                confOverrides = jvmOpts;
+            } else {
+                confOverrides.putAll(jvmOpts); // override with values obtained from -Dstorm.options
+            }
+            LOG.info("Rebalancing topology with overrides {}", JSONObject.toJSONString(confOverrides));
+        }
 
         if (null != confOverrides) {
             rebalanceOptions.set_topology_conf_overrides(JSONValue.toJSONString(confOverrides));