Merge pull request #3181 from marjoral/STORM-3552

STORM-3552 Fix update log level from command line
diff --git a/docs/Setting-up-a-Storm-cluster.md b/docs/Setting-up-a-Storm-cluster.md
index ea05da4..b57639e 100644
--- a/docs/Setting-up-a-Storm-cluster.md
+++ b/docs/Setting-up-a-Storm-cluster.md
@@ -30,7 +30,7 @@
 Next you need to install Storm's dependencies on Nimbus and the worker machines. These are:
 
 1. Java 8+ (Apache Storm 2.x is tested through travis ci against a java 8 JDK)
-2. Python 2.6.6 (Python 3.x should work too, but is not tested as part of our CI enviornment)
+2. Python 2.7.x or Python 3.x
 
 These are the versions of the dependencies that have been tested with Storm. Storm may or may not work with different versions of Java and/or Python.
 
diff --git a/docs/flux.md b/docs/flux.md
index 27d28bb..000270f 100644
--- a/docs/flux.md
+++ b/docs/flux.md
@@ -47,7 +47,7 @@
 If you would like to build Flux from source and run the unit/integration tests, you will need the following installed
 on your system:
 
-* Python 2.6.x or later
+* Python 2.7.x or later
 * Node.js 0.10.x or later
 
 #### Building with unit tests enabled:
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java
index c4d15fc..31425d4 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyClient.java
@@ -23,7 +23,6 @@
 import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.auth.kerberos.KerberosTicket;
-import javax.security.auth.login.Configuration;
 import javax.security.auth.login.LoginException;
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
@@ -55,26 +54,16 @@
                   SaslUtils.KERBEROS);
 
         LOG.info("Creating Kerberos Client.");
-
-        Configuration loginConf;
-        try {
-            loginConf = ClientAuthUtils.getConfiguration(topoConf);
-        } catch (Throwable t) {
-            LOG.error("Failed to get loginConf: ", t);
-            throw t;
-        }
         LOG.debug("KerberosSaslNettyClient: authmethod {}", SaslUtils.KERBEROS);
 
         SaslClientCallbackHandler ch = new SaslClientCallbackHandler();
 
+        String jaasConfFile = ClientAuthUtils.getJaasConf(topoConf);
+
         subject = null;
         try {
-            LOG.debug("Setting Configuration to login_config: {}", loginConf);
-            //specify a configuration object to be used
-            Configuration.setConfiguration(loginConf);
-            //now login
-            LOG.debug("Trying to login.");
-            Login login = new Login(jaasSection, ch);
+            LOG.debug("Trying to login using {}.", jaasConfFile);
+            Login login = new Login(jaasSection, ch, jaasConfFile);
             subject = login.getSubject();
             LOG.debug("Got Subject: {}", subject.toString());
         } catch (LoginException ex) {
@@ -88,12 +77,12 @@
             throw new RuntimeException("Fail to verify user principal with section \""
                     + jaasSection
                     + "\" in login configuration file "
-                    + loginConf);
+                    + jaasConfFile);
         }
 
         String serviceName = null;
         try {
-            serviceName = ClientAuthUtils.get(loginConf, jaasSection, "serviceName");
+            serviceName = ClientAuthUtils.get(topoConf, jaasSection, "serviceName");
         } catch (IOException e) {
             LOG.error("Failed to get service name.", e);
             throw new RuntimeException(e);
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java
index ee410d6..fefcdc6 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslNettyServer.java
@@ -25,7 +25,6 @@
 import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.auth.kerberos.KerberosPrincipal;
 import javax.security.auth.kerberos.KerberosTicket;
-import javax.security.auth.login.Configuration;
 import javax.security.auth.login.LoginException;
 import javax.security.sasl.AuthorizeCallback;
 import javax.security.sasl.Sasl;
@@ -49,28 +48,17 @@
 
     KerberosSaslNettyServer(Map<String, Object> topoConf, String jaasSection, List<String> authorizedUsers) {
         this.authorizedUsers = authorizedUsers;
-        LOG.debug("Getting Configuration.");
-        Configuration loginConf;
-        try {
-            loginConf = ClientAuthUtils.getConfiguration(topoConf);
-        } catch (Throwable t) {
-            LOG.error("Failed to get loginConf: ", t);
-            throw t;
-        }
 
         LOG.debug("KerberosSaslNettyServer: authmethod {}", SaslUtils.KERBEROS);
 
         KerberosSaslCallbackHandler ch = new KerberosSaslNettyServer.KerberosSaslCallbackHandler(authorizedUsers);
+        String jaasConfFile = ClientAuthUtils.getJaasConf(topoConf);
 
         //login our principal
         subject = null;
         try {
-            LOG.debug("Setting Configuration to login_config: {}", loginConf);
-            //specify a configuration object to be used
-            Configuration.setConfiguration(loginConf);
-            //now login
-            LOG.debug("Trying to login.");
-            Login login = new Login(jaasSection, ch);
+            LOG.debug("Trying to login using {}.", jaasConfFile);
+            Login login = new Login(jaasSection, ch, jaasConfFile);
             subject = login.getSubject();
             LOG.debug("Got Subject: {}", subject.toString());
         } catch (LoginException ex) {
@@ -84,7 +72,7 @@
             throw new RuntimeException("Fail to verify user principal with section \""
                                        + jaasSection
                                        + "\" in login configuration file "
-                                       + loginConf);
+                                       + jaasConfFile);
         }
 
         try {
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
index 6a50d84..d6a345e 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
@@ -19,6 +19,9 @@
  * does not die.
  */
 
+import java.io.File;
+import java.net.URI;
+import java.security.URIParameter;
 import java.util.Date;
 import java.util.Random;
 import java.util.Set;
@@ -31,6 +34,7 @@
 import javax.security.auth.login.LoginContext;
 import javax.security.auth.login.LoginException;
 import org.apache.log4j.Logger;
+import org.apache.storm.security.auth.ClientAuthUtils;
 import org.apache.storm.shade.org.apache.zookeeper.Shell;
 import org.apache.storm.shade.org.apache.zookeeper.client.ZooKeeperSaslClient;
 
@@ -57,12 +61,9 @@
     private Thread thread = null;
     private boolean isKrbTicket = false;
     private boolean isUsingTicketCache = false;
-    private boolean isUsingKeytab = false;
     private LoginContext login = null;
     private String loginContextName = null;
-    private String keytabFile = null;
     private String principal = null;
-
     private long lastLogin = 0;
 
     /**
@@ -77,14 +78,14 @@
      * @throws javax.security.auth.login.LoginException
      *               Thrown if authentication fails.
      */
-    public Login(final String loginContextName, CallbackHandler callbackHandler)
+    public Login(final String loginContextName, CallbackHandler callbackHandler, String jaasConfFile)
         throws LoginException {
         this.callbackHandler = callbackHandler;
-        login = login(loginContextName);
+        login = login(loginContextName, jaasConfFile);
         this.loginContextName = loginContextName;
         subject = login.getSubject();
         isKrbTicket = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
-        AppConfigurationEntry[] entries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
+        AppConfigurationEntry[] entries = this.getConfiguration(jaasConfFile).getAppConfigurationEntry(loginContextName);
         for (AppConfigurationEntry entry : entries) {
             // there will only be a single entry, so this for() loop will only be iterated through once.
             if (entry.getOptions().get("useTicketCache") != null) {
@@ -93,10 +94,6 @@
                     isUsingTicketCache = true;
                 }
             }
-            if (entry.getOptions().get("keyTab") != null) {
-                keytabFile = (String) entry.getOptions().get("keyTab");
-                isUsingKeytab = true;
-            }
             if (entry.getOptions().get("principal") != null) {
                 principal = (String) entry.getOptions().get("principal");
             }
@@ -251,6 +248,19 @@
         thread.setDaemon(true);
     }
 
+    private Configuration getConfiguration(String jaasConfFile) {
+        File configFile = new File(jaasConfFile);
+        if (!configFile.canRead()) {
+            throw new RuntimeException("File " + jaasConfFile + " cannot be read.");
+        }
+        try {
+            URI configUri = configFile.toURI();
+            return Configuration.getInstance("JavaLoginConfig", new URIParameter(configUri));
+        } catch (Exception ex) {
+            throw new RuntimeException("Failed to get configuration for " + jaasConfFile, ex);
+        }
+    }
+
     public void startThreadIfNeeded() {
         // thread object 'thread' will be null if a refresh thread is not needed.
         if (thread != null) {
@@ -277,7 +287,7 @@
         return loginContextName;
     }
 
-    private synchronized LoginContext login(final String loginContextName) throws LoginException {
+    private synchronized LoginContext login(final String loginContextName, String jaasConfFile) throws LoginException {
         if (loginContextName == null) {
             throw new LoginException("loginContext name (JAAS file section header) was null. "
                     + "Please check your java.security.login.auth.config (="
@@ -285,9 +295,10 @@
                     + ") and your " + ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY + "(="
                     + System.getProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client") + ")");
         }
-        LoginContext loginContext = new LoginContext(loginContextName, callbackHandler);
+        Configuration configuration = this.getConfiguration(jaasConfFile);
+        LoginContext loginContext = new LoginContext(loginContextName, null, callbackHandler, configuration);
         loginContext.login();
-        LOG.info("successfully logged in.");
+        LOG.info("Successfully logged in to context " + loginContextName + " using " + jaasConfFile);
         return loginContext;
     }
 
diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
index b77e003..30faf1f 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
@@ -73,9 +73,8 @@
         switch (auth) {
 
             case "DIGEST":
-                Configuration loginConf = ClientAuthUtils.getConfiguration(config);
                 authMethod = ThriftNettyClientCodec.AuthMethod.DIGEST;
-                secret = ClientAuthUtils.makeDigestPayload(loginConf, ClientAuthUtils.LOGIN_CONTEXT_PACEMAKER_DIGEST);
+                secret = ClientAuthUtils.makeDigestPayload(config, ClientAuthUtils.LOGIN_CONTEXT_PACEMAKER_DIGEST);
                 if (secret == null) {
                     LOG.error("Can't start pacemaker server without digest secret.");
                     throw new RuntimeException("Can't start pacemaker server without digest secret.");
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java b/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java
index 9fb0e4b..6a9b703 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java
@@ -60,6 +60,10 @@
     private static final String USERNAME = "username";
     private static final String PASSWORD = "password";
 
+    public static String getJaasConf(Map<String, Object> topoConf) {
+        return (String) topoConf.get("java.security.auth.login.config");
+    }
+
     /**
      * Construct a JAAS configuration object per storm configuration file.
      *
@@ -70,7 +74,7 @@
         Configuration loginConf = null;
 
         //find login file configuration from Storm configuration
-        String loginConfigurationFile = (String) topoConf.get("java.security.auth.login.config");
+        String loginConfigurationFile = getJaasConf(topoConf);
         if ((loginConfigurationFile != null) && (loginConfigurationFile.length() > 0)) {
             File configFile = new File(loginConfigurationFile);
             if (!configFile.canRead()) {
@@ -111,12 +115,13 @@
     /**
      * Pull a set of keys out of a Configuration.
      *
-     * @param configuration The config to pull the key/value pairs out of.
+     * @param topoConf  The config containing the jaas conf file.
      * @param section       The app configuration entry name to get stuff from.
      * @return Return a map of the configs in conf.
      */
-    public static SortedMap<String, ?> pullConfig(Configuration configuration,
+    public static SortedMap<String, ?> pullConfig(Map<String, Object> topoConf,
                                                   String section) throws IOException {
+        Configuration configuration = ClientAuthUtils.getConfiguration(topoConf);
         AppConfigurationEntry[] configurationEntries = ClientAuthUtils.getEntries(configuration, section);
 
         if (configurationEntries == null) {
@@ -138,12 +143,17 @@
     /**
      * Pull a the value given section and key from Configuration.
      *
-     * @param configuration The config to pull the key/value pairs out of.
+     * @param topoConf   The config containing the jaas conf file.
      * @param section       The app configuration entry name to get stuff from.
      * @param key           The key to look up inside of the section
      * @return Return a the String value of the configuration value
      */
-    public static String get(Configuration configuration, String section, String key) throws IOException {
+    public static String get(Map<String, Object> topoConf, String section, String key) throws IOException {
+        Configuration configuration = ClientAuthUtils.getConfiguration(topoConf);
+        return get(configuration, section, key);
+    }
+
+    static String get(Configuration configuration, String section, String key) throws IOException {
         AppConfigurationEntry[] configurationEntries = ClientAuthUtils.getEntries(configuration, section);
 
         if (configurationEntries == null) {
@@ -456,22 +466,22 @@
     /**
      * Construct a transport plugin per storm configuration.
      */
-    public static ITransportPlugin getTransportPlugin(ThriftConnectionType type, Map<String, Object> topoConf, Configuration loginConf) {
+    public static ITransportPlugin getTransportPlugin(ThriftConnectionType type, Map<String, Object> topoConf) {
         try {
             String transportPluginClassName = type.getTransportPlugin(topoConf);
             ITransportPlugin transportPlugin = ReflectionUtils.newInstance(transportPluginClassName);
-            transportPlugin.prepare(type, topoConf, loginConf);
+            transportPlugin.prepare(type, topoConf);
             return transportPlugin;
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
 
-    public static String makeDigestPayload(Configuration loginConfig, String configSection) {
+    public static String makeDigestPayload(Map<String, Object> topoConf, String configSection) {
         String username = null;
         String password = null;
         try {
-            Map<String, ?> results = ClientAuthUtils.pullConfig(loginConfig, configSection);
+            Map<String, ?> results = ClientAuthUtils.pullConfig(topoConf, configSection);
             username = (String) results.get(USERNAME);
             password = (String) results.get(PASSWORD);
         } catch (Exception e) {
@@ -492,6 +502,8 @@
         }
     }
 
+
+
     public static byte[] serializeKerberosTicket(KerberosTicket tgt) throws Exception {
         ByteArrayOutputStream bao = new ByteArrayOutputStream();
         ObjectOutputStream out = new ObjectOutputStream(bao);
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
index 6bf3b0b..19ec374 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
@@ -29,9 +29,8 @@
      *
      * @param type      the type of connection this will process.
      * @param topoConf  Storm configuration
-     * @param loginConf login configuration
      */
-    void prepare(ThriftConnectionType type, Map<String, Object> topoConf, Configuration loginConf);
+    void prepare(ThriftConnectionType type, Map<String, Object> topoConf);
 
     /**
      * Create a server associated with a given port, service handler, and purpose.
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
index 21a6ce1..0605385 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
@@ -48,14 +48,12 @@
     private static final Logger LOG = LoggerFactory.getLogger(SimpleTransportPlugin.class);
     protected ThriftConnectionType type;
     protected Map<String, Object> topoConf;
-    protected Configuration loginConf;
     private int port;
 
     @Override
-    public void prepare(ThriftConnectionType type, Map<String, Object> topoConf, Configuration loginConf) {
+    public void prepare(ThriftConnectionType type, Map<String, Object> topoConf) {
         this.type = type;
         this.topoConf = topoConf;
-        this.loginConf = loginConf;
     }
 
     @Override
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
index a9d9129..88d70ec 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java
@@ -83,11 +83,8 @@
                 socket.setTimeout(timeout);
             }
 
-            //locate login configuration 
-            Configuration loginConf = ClientAuthUtils.getConfiguration(conf);
-
             //construct a transport plugin
-            ITransportPlugin transportPlugin = ClientAuthUtils.getTransportPlugin(type, conf, loginConf);
+            ITransportPlugin transportPlugin = ClientAuthUtils.getTransportPlugin(type, conf);
 
             //TODO get this from type instead of hardcoding to Nimbus.
             //establish client-server transport via plugin
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
index f6102bc..9938b7c 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
@@ -12,10 +12,8 @@
 
 package org.apache.storm.security.auth;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.Map;
-import javax.security.auth.login.Configuration;
 import org.apache.storm.security.auth.sasl.SaslTransportPlugin;
 import org.apache.storm.thrift.TProcessor;
 import org.apache.storm.thrift.server.TServer;
@@ -29,7 +27,6 @@
     private final Map<String, Object> conf; //storm configuration
     private final ThriftConnectionType type;
     private TServer server;
-    private Configuration loginConf;
     private int port;
     private boolean areWorkerTokensSupported;
     private ITransportPlugin transportPlugin;
@@ -40,14 +37,8 @@
         this.type = type;
 
         try {
-            //retrieve authentication configuration 
-            loginConf = ClientAuthUtils.getConfiguration(this.conf);
-        } catch (Exception x) {
-            LOG.error(x.getMessage(), x);
-        }
-        try {
             //locate our thrift transport plugin
-            transportPlugin = ClientAuthUtils.getTransportPlugin(this.type, this.conf, loginConf);
+            transportPlugin = ClientAuthUtils.getTransportPlugin(this.type, this.conf);
             //server
             server = transportPlugin.getServer(this.processor);
             port = transportPlugin.getPort();
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
index e3e4497..5a77d51 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
@@ -16,6 +16,8 @@
 import java.util.Map;
 import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+
 import org.apache.storm.generated.WorkerToken;
 import org.apache.storm.security.auth.ClientAuthUtils;
 import org.apache.storm.security.auth.sasl.SaslTransportPlugin;
@@ -44,7 +46,7 @@
         //create an authentication callback handler
         CallbackHandler serverCallbackHandler = new SimpleSaslServerCallbackHandler(impersonationAllowed,
                                                                                     workerTokenAuthorizer,
-                                                                                    new JassPasswordProvider(loginConf));
+                                                                                    new JassPasswordProvider(conf));
 
         //create a transport factory that will invoke our auth callback for digest
         TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
@@ -60,7 +62,11 @@
         WorkerToken token = WorkerTokenClientCallbackHandler.findWorkerTokenInSubject(type);
         if (token != null) {
             clientCallbackHandler = new WorkerTokenClientCallbackHandler(token);
-        } else if (loginConf != null) {
+        } else {
+            Configuration loginConf = ClientAuthUtils.getConfiguration(conf);
+            if (loginConf == null) {
+                throw new IOException("Could not find any way to authenticate with the server.");
+            }
             AppConfigurationEntry[] configurationEntries = loginConf.getAppConfigurationEntry(ClientAuthUtils.LOGIN_CONTEXT_CLIENT);
             if (configurationEntries == null) {
                 String errorMessage = "Could not find a '" + ClientAuthUtils.LOGIN_CONTEXT_CLIENT
@@ -76,8 +82,6 @@
                 password = (String) options.getOrDefault("password", password);
             }
             clientCallbackHandler = new SimpleSaslClientCallbackHandler(username, password);
-        } else {
-            throw new IOException("Could not find any way to authenticate with the server.");
         }
 
         TSaslClientTransport wrapperTransport = new TSaslClientTransport(DIGEST,
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java
index 5c0d7a4..1d5c7e3 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java
@@ -36,10 +36,12 @@
     /**
      * Constructor.
      *
-     * @param configuration the jaas configuration to get the credentials out of.
+     * @param topoConf the configuration containing the jaas conf to use.
      * @throws IOException if we could not read the Server section in the jaas conf.
      */
-    public JassPasswordProvider(Configuration configuration) throws IOException {
+    public JassPasswordProvider(Map<String, Object> topoConf) throws IOException {
+
+        Configuration configuration = ClientAuthUtils.getConfiguration(topoConf);
         if (configuration == null) {
             return;
         }
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
index c4f9f91..6936555 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
@@ -121,11 +121,10 @@
         //Log the user in and get the TGT
         try {
             Configuration loginConf = ClientAuthUtils.getConfiguration(conf);
-            ClientCallbackHandler clientCallbackHandler = new ClientCallbackHandler(loginConf);
+            ClientCallbackHandler clientCallbackHandler = new ClientCallbackHandler(conf);
 
             //login our user
-            Configuration.setConfiguration(loginConf);
-            LoginContext lc = new LoginContext(ClientAuthUtils.LOGIN_CONTEXT_CLIENT, clientCallbackHandler);
+            LoginContext lc = new LoginContext(ClientAuthUtils.LOGIN_CONTEXT_CLIENT, null, clientCallbackHandler, loginConf);
             try {
                 lc.login();
                 final Subject subject = lc.getSubject();
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ClientCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ClientCallbackHandler.java
index 64673ce..f9a8e09 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ClientCallbackHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ClientCallbackHandler.java
@@ -13,6 +13,7 @@
 package org.apache.storm.security.auth.kerberos;
 
 import java.io.IOException;
+import java.util.Map;
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.callback.NameCallback;
@@ -36,7 +37,8 @@
      *
      * <p>For digest, you should have a pair of user name and password defined in this figgure.
      */
-    public ClientCallbackHandler(Configuration configuration) throws IOException {
+    public ClientCallbackHandler(Map<String, Object> topoConf) throws IOException {
+        Configuration configuration = ClientAuthUtils.getConfiguration(topoConf);
         if (configuration == null) {
             return;
         }
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
index 915473b..5b42886 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
@@ -58,15 +58,15 @@
             workerTokenAuthorizer = new WorkerTokenAuthorizer(conf, type);
         }
         //create an authentication callback handler
-        CallbackHandler serverCallbackHandler = new ServerCallbackHandler(loginConf, impersonationAllowed);
+        CallbackHandler serverCallbackHandler = new ServerCallbackHandler(conf, impersonationAllowed);
+
+        String jaasConfFile = ClientAuthUtils.getJaasConf(conf);
 
         //login our principal
         Subject subject = null;
         try {
-            //specify a configuration object to be used
-            Configuration.setConfiguration(loginConf);
             //now login
-            Login login = new Login(ClientAuthUtils.LOGIN_CONTEXT_SERVER, serverCallbackHandler);
+            Login login = new Login(ClientAuthUtils.LOGIN_CONTEXT_SERVER, serverCallbackHandler, jaasConfFile);
             subject = login.getSubject();
             login.startThreadIfNeeded();
         } catch (LoginException ex) {
@@ -77,10 +77,10 @@
         //check the credential of our principal
         if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
             throw new RuntimeException("Fail to verify user principal with section \""
-                                       + ClientAuthUtils.LOGIN_CONTEXT_SERVER + "\" in login configuration file " + loginConf);
+                                       + ClientAuthUtils.LOGIN_CONTEXT_SERVER + "\" in login configuration file " + jaasConfFile);
         }
 
-        String principal = ClientAuthUtils.get(loginConf, ClientAuthUtils.LOGIN_CONTEXT_SERVER, "principal");
+        String principal = ClientAuthUtils.get(conf, ClientAuthUtils.LOGIN_CONTEXT_SERVER, "principal");
         LOG.debug("principal:" + principal);
         KerberosName serviceKerberosName = new KerberosName(principal);
         String serviceName = serviceKerberosName.getServiceName();
@@ -107,11 +107,9 @@
     private Login mkLogin() throws IOException {
         try {
             //create an authentication callback handler
-            ClientCallbackHandler clientCallbackHandler = new ClientCallbackHandler(loginConf);
-            //specify a configuration object to be used
-            Configuration.setConfiguration(loginConf);
+            ClientCallbackHandler clientCallbackHandler = new ClientCallbackHandler(conf);
             //now login
-            Login login = new Login(ClientAuthUtils.LOGIN_CONTEXT_CLIENT, clientCallbackHandler);
+            Login login = new Login(ClientAuthUtils.LOGIN_CONTEXT_CLIENT, clientCallbackHandler, ClientAuthUtils.getJaasConf(conf));
             login.startThreadIfNeeded();
             return login;
         } catch (LoginException ex) {
@@ -142,7 +140,7 @@
 
     private TTransport kerberosConnect(TTransport transport, String serverHost, String asUser) throws IOException {
         //login our user
-        SortedMap<String, ?> authConf = ClientAuthUtils.pullConfig(loginConf, ClientAuthUtils.LOGIN_CONTEXT_CLIENT);
+        SortedMap<String, ?> authConf = ClientAuthUtils.pullConfig(conf, ClientAuthUtils.LOGIN_CONTEXT_CLIENT);
         if (authConf == null) {
             throw new RuntimeException("Error in parsing the kerberos login Configuration, returned null");
         }
@@ -180,11 +178,11 @@
         final Subject subject = login.getSubject();
         if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //error
             throw new RuntimeException("Fail to verify user principal with section \""
-                                       + ClientAuthUtils.LOGIN_CONTEXT_CLIENT + "\" in login configuration file " + loginConf);
+                    + ClientAuthUtils.LOGIN_CONTEXT_CLIENT + "\" in login configuration file " + ClientAuthUtils.getJaasConf(conf));
         }
 
         final String principal = StringUtils.isBlank(asUser) ? getPrincipal(subject) : asUser;
-        String serviceName = ClientAuthUtils.get(loginConf, ClientAuthUtils.LOGIN_CONTEXT_CLIENT, "serviceName");
+        String serviceName = ClientAuthUtils.get(conf, ClientAuthUtils.LOGIN_CONTEXT_CLIENT, "serviceName");
         if (serviceName == null) {
             serviceName = ClientAuthUtils.SERVICE;
         }
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
index 05630e9..daa5fbf 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
@@ -13,6 +13,7 @@
 package org.apache.storm.security.auth.kerberos;
 
 import java.io.IOException;
+import java.util.Map;
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.callback.NameCallback;
@@ -35,8 +36,10 @@
     private static final Logger LOG = LoggerFactory.getLogger(ServerCallbackHandler.class);
     private final boolean impersonationAllowed;
 
-    public ServerCallbackHandler(Configuration configuration, boolean impersonationAllowed) throws IOException {
+    public ServerCallbackHandler(Map<String, Object> topoConf, boolean impersonationAllowed) throws IOException {
         this.impersonationAllowed = impersonationAllowed;
+
+        Configuration configuration = ClientAuthUtils.getConfiguration(topoConf);
         if (configuration == null) {
             return;
         }
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
index b204c1a..49f5b82 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
@@ -49,14 +49,12 @@
 public abstract class SaslTransportPlugin implements ITransportPlugin, Closeable {
     protected ThriftConnectionType type;
     protected Map<String, Object> conf;
-    protected Configuration loginConf;
     private int port;
 
     @Override
-    public void prepare(ThriftConnectionType type, Map<String, Object> conf, Configuration loginConf) {
+    public void prepare(ThriftConnectionType type, Map<String, Object> conf) {
         this.type = type;
         this.conf = conf;
-        this.loginConf = loginConf;
     }
 
     @Override
diff --git a/storm-client/test/jvm/org/apache/storm/security/auth/ClientAuthUtilsTest.java b/storm-client/test/jvm/org/apache/storm/security/auth/ClientAuthUtilsTest.java
index 9f09ee4..1aa854b 100644
--- a/storm-client/test/jvm/org/apache/storm/security/auth/ClientAuthUtilsTest.java
+++ b/storm-client/test/jvm/org/apache/storm/security/auth/ClientAuthUtilsTest.java
@@ -95,8 +95,9 @@
 
     @Test
     public void objGettersReturnNullWithNullConfigTest() throws IOException {
-        Assert.assertNull(ClientAuthUtils.pullConfig(null, "foo"));
-        Assert.assertNull(ClientAuthUtils.get(null, "foo", "bar"));
+        Map<String, Object> topoConf = new HashMap<>();
+        Assert.assertNull(ClientAuthUtils.pullConfig(topoConf, "foo"));
+        Assert.assertNull(ClientAuthUtils.get(topoConf, "foo", "bar"));
 
         Assert.assertNull(ClientAuthUtils.getConfiguration(Collections.emptyMap()));
     }
@@ -141,39 +142,6 @@
         Mockito.verify(autoCred, Mockito.times(1)).populateSubject(subject, cred);
     }
 
-    @Test
-    public void makeDigestPayloadTest() throws NoSuchAlgorithmException {
-        String section = "user-pass-section";
-        Map<String, String> optionMap = new HashMap<String, String>();
-        String user = "user";
-        String pass = "pass";
-        optionMap.put("username", user);
-        optionMap.put("password", pass);
-        AppConfigurationEntry entry = Mockito.mock(AppConfigurationEntry.class);
-
-        Mockito.<Map<String, ?>>when(entry.getOptions()).thenReturn(optionMap);
-        Configuration mockConfig = Mockito.mock(Configuration.class);
-        Mockito.when(mockConfig.getAppConfigurationEntry(section))
-               .thenReturn(new AppConfigurationEntry[]{ entry });
-
-        MessageDigest digest = MessageDigest.getInstance("SHA-512");
-        byte[] output = digest.digest((user + ":" + pass).getBytes());
-        String sha = Hex.encodeHexString(output);
-
-        // previous code used this method to generate the string, ensure the two match
-        StringBuilder builder = new StringBuilder();
-        for (byte b : output) {
-            builder.append(String.format("%02x", b));
-        }
-        String stringFormatMethod = builder.toString();
-
-        Assert.assertEquals(
-            ClientAuthUtils.makeDigestPayload(mockConfig, "user-pass-section"),
-            sha);
-
-        Assert.assertEquals(sha, stringFormatMethod);
-    }
-
     @Test(expected = RuntimeException.class)
     public void invalidConfigResultsInIOException() throws RuntimeException {
         HashMap<String, Object> conf = new HashMap<>();
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index df419b9..85e5f9a 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -12,8 +12,10 @@
 
 package org.apache.storm.daemon.supervisor;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -36,13 +38,13 @@
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.ProfileAction;
 import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
 import org.apache.storm.localizer.AsyncLocalizer;
 import org.apache.storm.localizer.BlobChangingCallback;
 import org.apache.storm.localizer.GoodToGo;
 import org.apache.storm.localizer.LocallyCachedBlob;
 import org.apache.storm.metricstore.WorkerMetricsProcessor;
 import org.apache.storm.scheduler.ISupervisor;
-import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.utils.EnumUtil;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.ObjectReader;
@@ -174,9 +176,80 @@
     }
 
     /**
-     * Decide the equivalence of two local assignments, ignoring the order of executors
-     * This is different from #equal method.
-     * @param first Local assignment A
+     * This method compares WorkerResources while considering any resources are NULL to be 0.0
+     *
+     * @param first  WorkerResources A
+     * @param second WorkerResources B
+     * @return True if A and B are equivalent, treating the absent resources as 0.0
+     */
+    @VisibleForTesting
+    static boolean customWorkerResourcesEquality(WorkerResources first, WorkerResources second) {
+        if (first == null) {
+            return false;
+        }
+        if (first == second) {
+            return true;
+        }
+        if (first.equals(second)) {
+            return true;
+        }
+
+        if (first.get_cpu() != second.get_cpu()) {
+            return false;
+        }
+        if (first.get_mem_on_heap() != second.get_mem_on_heap()) {
+            return false;
+        }
+        if (first.get_mem_off_heap() != second.get_mem_off_heap()) {
+            return false;
+        }
+        if (first.get_shared_mem_off_heap() != second.get_shared_mem_off_heap()) {
+            return false;
+        }
+        if (first.get_shared_mem_on_heap() != second.get_shared_mem_on_heap()) {
+            return false;
+        }
+        if (!customResourceMapEquality(first.get_resources(), second.get_resources())) {
+            return false;
+        }
+        if (!customResourceMapEquality(first.get_shared_resources(), second.get_shared_resources())) {
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * This method compares Resource Maps while considering any resources are NULL to be 0.0
+     *
+     * @param firstMap  Resource Map A
+     * @param secondMap Resource Map B
+     * @return True if A and B are equivalent, treating the absent resources as 0.0
+     */
+    private static boolean customResourceMapEquality(Map<String, Double> firstMap, Map<String, Double> secondMap) {
+        if (firstMap == null && secondMap == null) {
+            return true;
+        }
+        if (firstMap == null) {
+            firstMap = new HashMap<>();
+        }
+        if (secondMap == null) {
+            secondMap = new HashMap<>();
+        }
+
+        Set<String> keys = new HashSet<>(firstMap.keySet());
+        keys.addAll(secondMap.keySet());
+        for (String key : keys) {
+            if (firstMap.getOrDefault(key, 0.0).doubleValue() != secondMap.getOrDefault(key, 0.0).doubleValue()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Decide the equivalence of two local assignments, ignoring the order of executors This is different from #equal method.
+     *
+     * @param first  Local assignment A
      * @param second Local assignment B
      * @return True if A and B are equivalent, ignoring the order of the executors
      */
@@ -196,9 +269,9 @@
                         return true;
                     }
                     if (firstHasResources && secondHasResources) {
-                        if (first.get_resources().equals(second.get_resources())) {
-                            return true;
-                        }
+                        WorkerResources firstResources = first.get_resources();
+                        WorkerResources secondResources = second.get_resources();
+                        return customWorkerResourcesEquality(firstResources, secondResources);
                     }
                 }
             }
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index fdd130b..48ad7e8 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -262,22 +262,26 @@
                     while (!done) {
                         try {
                             synchronized (blob) {
-                                long localVersion = blob.getLocalVersion();
-                                long remoteVersion = blob.getRemoteVersion(blobStore);
-                                if (localVersion != remoteVersion || !blob.isFullyDownloaded()) {
-                                    if (blob.isFullyDownloaded()) {
-                                        //Avoid case of different blob version
-                                        // when blob is not downloaded (first time download)
-                                        numBlobUpdateVersionChanged.mark();
+                                if (blob.isUsed()) {
+                                    long localVersion = blob.getLocalVersion();
+                                    long remoteVersion = blob.getRemoteVersion(blobStore);
+                                    if (localVersion != remoteVersion || !blob.isFullyDownloaded()) {
+                                        if (blob.isFullyDownloaded()) {
+                                            //Avoid case of different blob version
+                                            // when blob is not downloaded (first time download)
+                                            numBlobUpdateVersionChanged.mark();
+                                        }
+                                        Timer.Context t = singleBlobLocalizationDuration.time();
+                                        try {
+                                            long newVersion = blob.fetchUnzipToTemp(blobStore);
+                                            blob.informReferencesAndCommitNewVersion(newVersion);
+                                            t.stop();
+                                        } finally {
+                                            blob.cleanupOrphanedData();
+                                        }
                                     }
-                                    Timer.Context t = singleBlobLocalizationDuration.time();
-                                    try {
-                                        long newVersion = blob.fetchUnzipToTemp(blobStore);
-                                        blob.informReferencesAndCommitNewVersion(newVersion);
-                                        t.stop();
-                                    } finally {
-                                        blob.cleanupOrphanedData();
-                                    }
+                                } else {
+                                    LOG.debug("Skipping update of unused blob {}", blob);
                                 }
                             }
                             done = true;
diff --git a/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java b/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
index d8364dc..19e6cbe 100644
--- a/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
+++ b/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
@@ -63,9 +63,8 @@
         switch (auth) {
 
             case "DIGEST":
-                Configuration loginConf = ClientAuthUtils.getConfiguration(config);
                 authMethod = ThriftNettyServerCodec.AuthMethod.DIGEST;
-                this.secret = ClientAuthUtils.makeDigestPayload(loginConf, ClientAuthUtils.LOGIN_CONTEXT_PACEMAKER_DIGEST);
+                this.secret = ClientAuthUtils.makeDigestPayload(config, ClientAuthUtils.LOGIN_CONTEXT_PACEMAKER_DIGEST);
                 if (this.secret == null) {
                     LOG.error("Can't start pacemaker server without digest secret.");
                     throw new RuntimeException("Can't start pacemaker server without digest secret.");
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
index 9dfe2d7..1cfea8a 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -15,10 +15,13 @@
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 
+import com.google.common.collect.Maps;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -77,6 +80,14 @@
         return resources;
     }
 
+    static WorkerResources mkWorkerResources(Double cpu, Double mem_on_heap, Double mem_off_heap, Map<String, Double> resources) {
+        WorkerResources workerResources = mkWorkerResources(cpu, mem_on_heap, mem_off_heap);
+        if (resources != null) {
+            workerResources.set_resources(resources);
+        }
+        return workerResources;
+    }
+
     static LSWorkerHeartbeat mkWorkerHB(String id, int port, List<ExecutorInfo> exec, Integer timeSecs) {
         LSWorkerHeartbeat ret = new LSWorkerHeartbeat();
         ret.set_topology_id(id);
@@ -108,12 +119,64 @@
     }
 
     @Test
-    public void testEquivilant() {
+    public void testWorkerResourceEquality() {
+        WorkerResources resourcesRNull = mkWorkerResources(100.0, 100.0, 100.0, null);
+        WorkerResources resourcesREmpty = mkWorkerResources(100.0, 100.0, 100.0, Maps.newHashMap());
+        assertTrue(Slot.customWorkerResourcesEquality(resourcesRNull,resourcesREmpty));
+
+        Map resources = new HashMap<String, Double>();
+        resources.put("network.resource.units", 0.0);
+        WorkerResources resourcesRNetwork = mkWorkerResources(100.0, 100.0, 100.0,resources);
+        assertTrue(Slot.customWorkerResourcesEquality(resourcesREmpty, resourcesRNetwork));
+
+
+        Map resourcesNetwork = new HashMap<String, Double>();
+        resourcesNetwork.put("network.resource.units", 50.0);
+        WorkerResources resourcesRNetworkNonZero = mkWorkerResources(100.0, 100.0, 100.0,resourcesNetwork);
+        assertFalse(Slot.customWorkerResourcesEquality(resourcesREmpty, resourcesRNetworkNonZero));
+
+        Map resourcesNetworkOne = new HashMap<String, Double>();
+        resourcesNetworkOne.put("network.resource.units", 50.0);
+        WorkerResources resourcesRNetworkOne = mkWorkerResources(100.0, 100.0, 100.0,resourcesNetworkOne);
+        assertTrue(Slot.customWorkerResourcesEquality(resourcesRNetworkOne, resourcesRNetworkNonZero));
+
+        Map resourcesNetworkTwo = new HashMap<String, Double>();
+        resourcesNetworkTwo.put("network.resource.units", 100.0);
+        WorkerResources resourcesRNetworkTwo = mkWorkerResources(100.0, 100.0, 100.0,resourcesNetworkTwo);
+        assertFalse(Slot.customWorkerResourcesEquality(resourcesRNetworkOne, resourcesRNetworkTwo));
+
+        WorkerResources resourcesCpuNull = mkWorkerResources(null, 100.0,100.0);
+        WorkerResources resourcesCPUZero = mkWorkerResources(0.0, 100.0,100.0);
+        assertTrue(Slot.customWorkerResourcesEquality(resourcesCpuNull, resourcesCPUZero));
+
+        WorkerResources resourcesOnHeapMemNull = mkWorkerResources(100.0, null,100.0);
+        WorkerResources resourcesOnHeapMemZero = mkWorkerResources(100.0, 0.0,100.0);
+        assertTrue(Slot.customWorkerResourcesEquality(resourcesOnHeapMemNull, resourcesOnHeapMemZero));
+
+        WorkerResources resourcesOffHeapMemNull = mkWorkerResources(100.0, 100.0,null);
+        WorkerResources resourcesOffHeapMemZero = mkWorkerResources(100.0, 100.0,0.0);
+        assertTrue(Slot.customWorkerResourcesEquality(resourcesOffHeapMemNull, resourcesOffHeapMemZero));
+
+    }
+
+    @Test
+    public void testEquivalent() {
         LocalAssignment a = mkLocalAssignment("A", mkExecutorInfoList(1, 2, 3, 4, 5), mkWorkerResources(100.0, 100.0, 100.0));
         LocalAssignment aResized = mkLocalAssignment("A", mkExecutorInfoList(1, 2, 3, 4, 5), mkWorkerResources(100.0, 200.0, 100.0));
         LocalAssignment b = mkLocalAssignment("B", mkExecutorInfoList(1, 2, 3, 4, 5, 6), mkWorkerResources(100.0, 100.0, 100.0));
         LocalAssignment bReordered = mkLocalAssignment("B", mkExecutorInfoList(6, 5, 4, 3, 2, 1), mkWorkerResources(100.0, 100.0, 100.0));
 
+        LocalAssignment c = mkLocalAssignment("C", mkExecutorInfoList(188, 261),mkWorkerResources(400.0,10000.0,0.0));
+
+        WorkerResources workerResources = mkWorkerResources(400.0, 10000.0, 0.0);
+        Map<String, Double> additionalResources = workerResources.get_resources();
+        if( additionalResources == null) additionalResources = new HashMap<>();
+        additionalResources.put("network.resource.units", 0.0);
+
+        workerResources.set_resources(additionalResources);
+        LocalAssignment cReordered = mkLocalAssignment("C", mkExecutorInfoList(188, 261), workerResources);
+
+        assertTrue(Slot.equivalent(c,cReordered));
         assertTrue(Slot.equivalent(null, null));
         assertTrue(Slot.equivalent(a, a));
         assertTrue(Slot.equivalent(b, bReordered));
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index d3c1d74..f97c868 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -126,18 +126,21 @@
             when(jarBlob.getLocalVersion()).thenReturn(-1L);
             when(jarBlob.getRemoteVersion(any())).thenReturn(100L);
             when(jarBlob.fetchUnzipToTemp(any())).thenReturn(100L);
+            when(jarBlob.isUsed()).thenReturn(true);
 
             LocallyCachedTopologyBlob codeBlob = mock(LocallyCachedTopologyBlob.class);
             doReturn(codeBlob).when(victim).getTopoCode(topoId, localAssignment.get_owner());
             when(codeBlob.getLocalVersion()).thenReturn(-1L);
             when(codeBlob.getRemoteVersion(any())).thenReturn(200L);
             when(codeBlob.fetchUnzipToTemp(any())).thenReturn(200L);
+            when(codeBlob.isUsed()).thenReturn(true);
 
             LocallyCachedTopologyBlob confBlob = mock(LocallyCachedTopologyBlob.class);
             doReturn(confBlob).when(victim).getTopoConf(topoId, localAssignment.get_owner());
             when(confBlob.getLocalVersion()).thenReturn(-1L);
             when(confBlob.getRemoteVersion(any())).thenReturn(300L);
             when(confBlob.fetchUnzipToTemp(any())).thenReturn(300L);
+            when(confBlob.isUsed()).thenReturn(true);
 
             when(mockedReflectionUtils.newInstanceImpl(ClientBlobStore.class)).thenReturn(blobStore);
 
diff --git a/storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java b/storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java
index 49a4641..7802390 100644
--- a/storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java
+++ b/storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java
@@ -576,7 +576,7 @@
     public void getTransportPluginThrowsRunimeTest() {
         Map<String, Object> conf = ConfigUtils.readStormConfig();
         conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, "null.invalid");
-        ClientAuthUtils.getTransportPlugin(ThriftConnectionType.NIMBUS, conf, null);
+        ClientAuthUtils.getTransportPlugin(ThriftConnectionType.NIMBUS, conf);
     }
 
     @Test