[STORM-3658] Avoid deadlock and race condition caused by shutdown hooks (#3299)

diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index f703eda..b0075d9 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -152,10 +152,14 @@
         StormCommon.validateDistributedMode(conf);
         int supervisorPortInt = Integer.parseInt(supervisorPort);
         Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt, Integer.parseInt(portStr), workerId);
-        worker.start();
+
+        //Add shutdown hooks before starting any other threads to avoid possible race condition
+        //between invoking shutdown hooks and registering shutdown hooks. See STORM-3658.
         int workerShutdownSleepSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
         LOG.info("Adding shutdown hook with kill in {} secs", workerShutdownSleepSecs);
         Utils.addShutdownHookWithDelayedForceKill(worker::shutdown, workerShutdownSleepSecs);
+
+        worker.start();
     }
 
     public void start() throws Exception {
@@ -477,54 +481,59 @@
         try {
             LOG.info("Shutting down worker {} {} {}", topologyId, assignmentId, port);
 
-            for (IConnection socket : workerState.cachedNodeToPortSocket.get().values()) {
-                //this will do best effort flushing since the linger period
-                // was set on creation
-                socket.close();
-            }
-            LOG.info("Terminating messaging context");
-            LOG.info("Shutting down executors");
-            for (IRunningExecutor executor : executorsAtom.get()) {
-                ((ExecutorShutdown) executor).shutdown();
-            }
-            LOG.info("Shut down executors");
+            if (workerState != null) {
+                for (IConnection socket : workerState.cachedNodeToPortSocket.get().values()) {
+                    //this will do best effort flushing since the linger period
+                    // was set on creation
+                    socket.close();
+                }
+                LOG.info("Terminating messaging context");
+                LOG.info("Shutting down executors");
+                for (IRunningExecutor executor : executorsAtom.get()) {
+                    ((ExecutorShutdown) executor).shutdown();
+                }
+                LOG.info("Shut down executors");
 
-            LOG.info("Shutting down transfer thread");
-            workerState.haltWorkerTransfer();
+                LOG.info("Shutting down transfer thread");
+                workerState.haltWorkerTransfer();
 
-            if (transferThread != null) {
-                transferThread.interrupt();
-                transferThread.join();
-                LOG.info("Shut down transfer thread");
+                if (transferThread != null) {
+                    transferThread.interrupt();
+                    transferThread.join();
+                    LOG.info("Shut down transfer thread");
+                }
+
+                workerState.heartbeatTimer.close();
+                workerState.refreshConnectionsTimer.close();
+                workerState.refreshCredentialsTimer.close();
+                workerState.checkForUpdatedBlobsTimer.close();
+                workerState.refreshActiveTimer.close();
+                workerState.executorHeartbeatTimer.close();
+                workerState.userTimer.close();
+                workerState.refreshLoadTimer.close();
+                workerState.resetLogLevelsTimer.close();
+                workerState.flushTupleTimer.close();
+                workerState.backPressureCheckTimer.close();
+
+                // this is fine because the only time this is shared is when it's a local context,
+                // in which case it's a noop
+                workerState.mqContext.term();
+
+                workerState.closeResources();
+
+                LOG.info("Trigger any worker shutdown hooks");
+                workerState.runWorkerShutdownHooks();
+
+                workerState.stormClusterState.removeWorkerHeartbeat(topologyId, assignmentId, (long) port);
+                LOG.info("Disconnecting from storm cluster state context");
+                workerState.stormClusterState.disconnect();
+                workerState.stateStorage.close();
+            } else {
+                LOG.error("workerState is null");
             }
 
-            workerState.heartbeatTimer.close();
-            workerState.refreshConnectionsTimer.close();
-            workerState.refreshCredentialsTimer.close();
-            workerState.checkForUpdatedBlobsTimer.close();
-            workerState.refreshActiveTimer.close();
-            workerState.executorHeartbeatTimer.close();
-            workerState.userTimer.close();
-            workerState.refreshLoadTimer.close();
-            workerState.resetLogLevelsTimer.close();
-            workerState.flushTupleTimer.close();
-            workerState.backPressureCheckTimer.close();
-            
-            // this is fine because the only time this is shared is when it's a local context,
-            // in which case it's a noop
-            workerState.mqContext.term();
-            
-            workerState.closeResources();
-
             metricRegistry.stop();
 
-            LOG.info("Trigger any worker shutdown hooks");
-            workerState.runWorkerShutdownHooks();
-
-            workerState.stormClusterState.removeWorkerHeartbeat(topologyId, assignmentId, (long) port);
-            LOG.info("Disconnecting from storm cluster state context");
-            workerState.stormClusterState.disconnect();
-            workerState.stateStorage.close();
             LOG.info("Shut down worker {} {} {}", topologyId, assignmentId, port);
         } catch (Exception ex) {
             throw Utils.wrapInRuntime(ex);
diff --git a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
index 2a6d561..af2d981 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
@@ -97,7 +97,14 @@
             }
             for (Utils.SmartThread t : threads) {
                 LOG.debug("Executor " + executor.getComponentId() + ":" + executor.getExecutorId() + " joining thread " + t.getName());
-                t.join();
+                //Don't wait forever.
+                //This is to avoid the deadlock between the executor thread (t) and the shutdown hook (which invokes Worker::shutdown)
+                //when it is the executor thread (t) who invokes the shutdown hook. See STORM-3658.
+                long waitMs = 100;
+                t.join(waitMs);
+                if (t.isAlive()) {
+                    LOG.warn("Thread {} is still alive ({} ms after interruption). Stop waiting for it.", t.getName(), waitMs);
+                }
             }
             executor.getStats().cleanupStats();
             for (Task task : taskDatas) {
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index fec30cc..aca356a 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -348,13 +348,13 @@
             } catch (Exception e) {
                 LOG.warn("Exception in the ShutDownHook", e);
             }
-        });
+        }, "ShutdownHook-sleepKill-" + numSecs + "s");
         sleepKill.setDaemon(true);
-        Thread wrappedFunc = new Thread(() -> {
+        Thread shutdownFunc = new Thread(() -> {
             func.run();
             sleepKill.interrupt();
-        });
-        Runtime.getRuntime().addShutdownHook(wrappedFunc);
+        }, "ShutdownHook-shutdownFunc");
+        Runtime.getRuntime().addShutdownHook(shutdownFunc);
         Runtime.getRuntime().addShutdownHook(sleepKill);
     }