[STORM-3579]  use the topo conf for thrift client in Worker code
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 23e577e..b4e4c80 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -47,7 +47,6 @@
 import org.apache.storm.generated.ExecutorStats;
 import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.generated.LogConfig;
-import org.apache.storm.generated.Supervisor;
 import org.apache.storm.generated.SupervisorWorkerHeartbeat;
 import org.apache.storm.messaging.IConnection;
 import org.apache.storm.messaging.IContext;
@@ -75,6 +74,7 @@
     private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
     private static final Pattern BLOB_VERSION_EXTRACTION = Pattern.compile(".*\\.([0-9]+)$");
     private final Map<String, Object> conf;
+    private final Map<String, Object> topologyConf;
     private final IContext context;
     private final String topologyId;
     private final String assignmentId;
@@ -93,7 +93,6 @@
     private Collection<IAutoCredentials> autoCreds;
     private final Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier;
 
-
     /**
      * TODO: should worker even take the topologyId as input? this should be deducible from cluster state (by searching through assignments)
      * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency.
@@ -106,9 +105,9 @@
      * @param port           - port on which the worker runs
      * @param workerId       - worker id
      */
-
     public Worker(Map<String, Object> conf, IContext context, String topologyId, String assignmentId,
-                  int supervisorPort, int port, String workerId, Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier) {
+                  int supervisorPort, int port, String workerId, Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier)
+        throws IOException {
         this.conf = conf;
         this.context = context;
         this.topologyId = topologyId;
@@ -118,7 +117,25 @@
         this.workerId = workerId;
         this.logConfigManager = new LogConfigManager();
         this.metricRegistry = new StormMetricRegistry();
-        this.supervisorIfaceSupplier = supervisorIfaceSupplier;
+
+        this.topologyConf = ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
+
+        if (supervisorIfaceSupplier == null) {
+            this.supervisorIfaceSupplier = () -> {
+                try {
+                    return SupervisorClient.getConfiguredClient(topologyConf, Utils.hostname(), supervisorPort);
+                } catch (UnknownHostException e) {
+                    throw Utils.wrapInRuntime(e);
+                }
+            };
+        } else {
+            this.supervisorIfaceSupplier = supervisorIfaceSupplier;
+        }
+    }
+
+    public Worker(Map<String, Object> conf, IContext context, String topologyId, String assignmentId,
+                  int supervisorPort, int port, String workerId) throws IOException {
+        this(conf, context, topologyId, assignmentId, supervisorPort, port, workerId, null);
     }
 
     public static void main(String[] args) throws Exception {
@@ -132,15 +149,7 @@
         Utils.setupDefaultUncaughtExceptionHandler();
         StormCommon.validateDistributedMode(conf);
         int supervisorPortInt = Integer.parseInt(supervisorPort);
-        Supplier<SupervisorIfaceFactory> supervisorIfaceSuppler = () -> {
-            try {
-                return SupervisorClient.getConfiguredClient(conf, Utils.hostname(), supervisorPortInt);
-            } catch (UnknownHostException e) {
-                throw Utils.wrapInRuntime(e);
-            }
-        };
-        Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt,
-                                   Integer.parseInt(portStr), workerId, supervisorIfaceSuppler);
+        Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt, Integer.parseInt(portStr), workerId);
         worker.start();
         int workerShutdownSleepSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
         LOG.info("Adding shutdown hook with kill in {} secs", workerShutdownSleepSecs);
@@ -161,8 +170,7 @@
             FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
                                         Charset.forName("UTF-8"));
         }
-        final Map<String, Object> topologyConf =
-            ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
+
         ClusterStateContext csContext = new ClusterStateContext(DaemonType.WORKER, topologyConf);
         IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, topologyConf, csContext);
         IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext);
@@ -178,12 +186,12 @@
         subject = ClientAuthUtils.populateSubject(null, autoCreds, initCreds);
 
         Subject.doAs(subject, (PrivilegedExceptionAction<Object>)
-            () -> loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials)
+            () -> loadWorker(stateStorage, stormClusterState, initCreds, initialCredentials)
         );
 
     }
 
-    private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState,
+    private Object loadWorker(IStateStorage stateStorage, IStormClusterState stormClusterState,
                               Map<String, String> initCreds, Credentials initialCredentials)
         throws Exception {
         workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorIfaceSupplier, port, workerId,
@@ -221,7 +229,7 @@
 
         List<Executor> execs = new ArrayList<>();
         for (List<Long> e : workerState.getLocalExecutors()) {
-            if (ConfigUtils.isLocalMode(topologyConf)) {
+            if (ConfigUtils.isLocalMode(conf)) {
                 Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
                 execs.add(executor);
                 for (int i = 0; i < executor.getTaskIds().size(); ++i) {
@@ -443,7 +451,7 @@
         } catch (Exception tr1) {
             //If any error/exception thrown, report directly to nimbus.
             LOG.warn("Exception when send heartbeat to local supervisor", tr1.getMessage());
-            try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(conf)) {
+            try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(topologyConf)) {
                 nimbusClient.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat);
             } catch (Exception tr2) {
                 //if any error/exception thrown, just ignore.
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
index d6a345e..27b356a 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java
@@ -34,7 +34,6 @@
 import javax.security.auth.login.LoginContext;
 import javax.security.auth.login.LoginException;
 import org.apache.log4j.Logger;
-import org.apache.storm.security.auth.ClientAuthUtils;
 import org.apache.storm.shade.org.apache.zookeeper.Shell;
 import org.apache.storm.shade.org.apache.zookeeper.client.ZooKeeperSaslClient;
 
@@ -296,8 +295,14 @@
                     + System.getProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client") + ")");
         }
         Configuration configuration = this.getConfiguration(jaasConfFile);
-        LoginContext loginContext = new LoginContext(loginContextName, null, callbackHandler, configuration);
-        loginContext.login();
+        LoginContext loginContext;
+        try {
+            loginContext = new LoginContext(loginContextName, null, callbackHandler, configuration);
+            loginContext.login();
+        } catch (LoginException e) {
+            LOG.error("Login using jaas conf " + jaasConfFile + " failed");
+            throw e;
+        }
         LOG.info("Successfully logged in to context " + loginContextName + " using " + jaasConfFile);
         return loginContext;
     }