Merge pull request #3300 from bipinprasad/storm3659

[STORM-3659] Add StormVersion to OwnerTopologies table on User page
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 93c5499..c9cc554 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1676,10 +1676,9 @@
     @IsPositiveNumber
     public static final String EXECUTOR_METRICS_FREQUENCY_SECS = "executor.metrics.frequency.secs";
     /**
-     * How often a task should heartbeat its status to the master, deprecated for 2.0 RPC heartbeat reporting, see {@code
+     * How often a task should heartbeat its status to the Pacamker. For 2.0 RPC heartbeat reporting, see {@code
      * EXECUTOR_METRICS_FREQUENCY_SECS }.
      */
-    @Deprecated
     @IsInteger
     @IsPositiveNumber
     public static final String TASK_HEARTBEAT_FREQUENCY_SECS = "task.heartbeat.frequency.secs";
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index 94f0c86..412c761 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -75,6 +75,13 @@
     boolean isAssignmentsBackendSynchronized();
 
     /**
+     * Flag to indicate if the Pacameker is backend store.
+     *
+     * @return true if Pacemaker is being used as StateStore
+     */
+    boolean isPacemakerStateStore();
+
+    /**
      * Mark the assignments as synced successfully, see {@link #isAssignmentsBackendSynchronized()}.
      */
     void setAssignmentsBackendSynchronized();
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 80d5fb0..99dd98b 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -229,6 +229,14 @@
     }
 
     @Override
+    public boolean isPacemakerStateStore() {
+        if (stateStorage == null) {
+            return false;
+        }
+        return stateStorage instanceof PaceMakerStateStorage;
+    }
+
+    @Override
     public void setAssignmentsBackendSynchronized() {
         this.assignmentsBackend.setSynchronized();
     }
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 989d464..b0075d9 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -152,10 +152,14 @@
         StormCommon.validateDistributedMode(conf);
         int supervisorPortInt = Integer.parseInt(supervisorPort);
         Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt, Integer.parseInt(portStr), workerId);
-        worker.start();
+
+        //Add shutdown hooks before starting any other threads to avoid possible race condition
+        //between invoking shutdown hooks and registering shutdown hooks. See STORM-3658.
         int workerShutdownSleepSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
         LOG.info("Adding shutdown hook with kill in {} secs", workerShutdownSleepSecs);
         Utils.addShutdownHookWithDelayedForceKill(worker::shutdown, workerShutdownSleepSecs);
+
+        worker.start();
     }
 
     public void start() throws Exception {
@@ -219,8 +223,11 @@
                 }
             });
 
+        Integer execHeartBeatFreqSecs = workerState.stormClusterState.isPacemakerStateStore()
+            ? (Integer) conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS)
+            : (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS);
         workerState.executorHeartbeatTimer
-            .scheduleRecurring(0, (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS),
+            .scheduleRecurring(0, execHeartBeatFreqSecs,
                                Worker.this::doExecutorHeartbeats);
 
         workerState.refreshConnections();
@@ -365,7 +372,10 @@
         state.setWorkerHeartBeat(lsWorkerHeartbeat);
         state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up.
         // it shouldn't take supervisor 120 seconds between listing dir and reading it
-        heartbeatToMasterIfLocalbeatFail(lsWorkerHeartbeat);
+        if (!workerState.stormClusterState.isPacemakerStateStore()) {
+            LOG.debug("The pacemaker is not used, send heartbeat to master.");
+            heartbeatToMasterIfLocalbeatFail(lsWorkerHeartbeat);
+        }
         this.heatbeatMeter.mark();
     }
 
@@ -447,6 +457,7 @@
         if (ConfigUtils.isLocalMode(this.conf)) {
             return;
         }
+
         //In distributed mode, send heartbeat directly to master if local supervisor goes down.
         SupervisorWorkerHeartbeat workerHeartbeat = new SupervisorWorkerHeartbeat(lsWorkerHeartbeat.get_topology_id(),
                                                                                   lsWorkerHeartbeat.get_executors(),
@@ -470,54 +481,59 @@
         try {
             LOG.info("Shutting down worker {} {} {}", topologyId, assignmentId, port);
 
-            for (IConnection socket : workerState.cachedNodeToPortSocket.get().values()) {
-                //this will do best effort flushing since the linger period
-                // was set on creation
-                socket.close();
-            }
-            LOG.info("Terminating messaging context");
-            LOG.info("Shutting down executors");
-            for (IRunningExecutor executor : executorsAtom.get()) {
-                ((ExecutorShutdown) executor).shutdown();
-            }
-            LOG.info("Shut down executors");
+            if (workerState != null) {
+                for (IConnection socket : workerState.cachedNodeToPortSocket.get().values()) {
+                    //this will do best effort flushing since the linger period
+                    // was set on creation
+                    socket.close();
+                }
+                LOG.info("Terminating messaging context");
+                LOG.info("Shutting down executors");
+                for (IRunningExecutor executor : executorsAtom.get()) {
+                    ((ExecutorShutdown) executor).shutdown();
+                }
+                LOG.info("Shut down executors");
 
-            LOG.info("Shutting down transfer thread");
-            workerState.haltWorkerTransfer();
+                LOG.info("Shutting down transfer thread");
+                workerState.haltWorkerTransfer();
 
-            if (transferThread != null) {
-                transferThread.interrupt();
-                transferThread.join();
-                LOG.info("Shut down transfer thread");
+                if (transferThread != null) {
+                    transferThread.interrupt();
+                    transferThread.join();
+                    LOG.info("Shut down transfer thread");
+                }
+
+                workerState.heartbeatTimer.close();
+                workerState.refreshConnectionsTimer.close();
+                workerState.refreshCredentialsTimer.close();
+                workerState.checkForUpdatedBlobsTimer.close();
+                workerState.refreshActiveTimer.close();
+                workerState.executorHeartbeatTimer.close();
+                workerState.userTimer.close();
+                workerState.refreshLoadTimer.close();
+                workerState.resetLogLevelsTimer.close();
+                workerState.flushTupleTimer.close();
+                workerState.backPressureCheckTimer.close();
+
+                // this is fine because the only time this is shared is when it's a local context,
+                // in which case it's a noop
+                workerState.mqContext.term();
+
+                workerState.closeResources();
+
+                LOG.info("Trigger any worker shutdown hooks");
+                workerState.runWorkerShutdownHooks();
+
+                workerState.stormClusterState.removeWorkerHeartbeat(topologyId, assignmentId, (long) port);
+                LOG.info("Disconnecting from storm cluster state context");
+                workerState.stormClusterState.disconnect();
+                workerState.stateStorage.close();
+            } else {
+                LOG.error("workerState is null");
             }
 
-            workerState.heartbeatTimer.close();
-            workerState.refreshConnectionsTimer.close();
-            workerState.refreshCredentialsTimer.close();
-            workerState.checkForUpdatedBlobsTimer.close();
-            workerState.refreshActiveTimer.close();
-            workerState.executorHeartbeatTimer.close();
-            workerState.userTimer.close();
-            workerState.refreshLoadTimer.close();
-            workerState.resetLogLevelsTimer.close();
-            workerState.flushTupleTimer.close();
-            workerState.backPressureCheckTimer.close();
-            
-            // this is fine because the only time this is shared is when it's a local context,
-            // in which case it's a noop
-            workerState.mqContext.term();
-            
-            workerState.closeResources();
-
             metricRegistry.stop();
 
-            LOG.info("Trigger any worker shutdown hooks");
-            workerState.runWorkerShutdownHooks();
-
-            workerState.stormClusterState.removeWorkerHeartbeat(topologyId, assignmentId, (long) port);
-            LOG.info("Disconnecting from storm cluster state context");
-            workerState.stormClusterState.disconnect();
-            workerState.stateStorage.close();
             LOG.info("Shut down worker {} {} {}", topologyId, assignmentId, port);
         } catch (Exception ex) {
             throw Utils.wrapInRuntime(ex);
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index db0c0e5..8e5af6a 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -168,7 +168,6 @@
         this.autoCredentials = autoCredentials;
         this.conf = conf;
         this.supervisorIfaceSupplier = supervisorIfaceSupplier;
-        this.localExecutors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port));
         this.mqContext = (null != mqContext) ? mqContext : TransportFactory.makeContext(topologyConf);
         this.topologyId = topologyId;
         this.assignmentId = assignmentId;
@@ -176,6 +175,8 @@
         this.workerId = workerId;
         this.stateStorage = stateStorage;
         this.stormClusterState = stormClusterState;
+        this.localExecutors =
+            new HashSet<>(readWorkerExecutors(assignmentId, port, getLocalAssignment(this.stormClusterState, topologyId)));
         this.isWorkerActive = new CountDownLatch(1);
         this.isTopologyActive = new AtomicBoolean(false);
         this.stormComponentToDebug = new AtomicReference<>();
@@ -374,7 +375,23 @@
     public SmartThread makeTransferThread() {
         return workerTransfer.makeTransferThread();
     }
-    
+
+    public void suicideIfLocalAssignmentsChanged(Assignment assignment) {
+        if (assignment != null) {
+            Set<List<Long>> assignedExecutors = new HashSet<>(readWorkerExecutors(assignmentId, port, assignment));
+            if (!localExecutors.equals(assignedExecutors)) {
+                LOG.info("Found conflicting assignments. We shouldn't be alive!"
+                         + " Assigned: " + assignedExecutors + ", Current: "
+                         + localExecutors);
+                if (!ConfigUtils.isLocalMode(conf)) {
+                    suicideCallback.run();
+                } else {
+                    LOG.info("Local worker tried to commit suicide!");
+                }
+            }
+        }
+    }
+
     public void refreshConnections() {
         Assignment assignment = null;
         try {
@@ -383,6 +400,7 @@
             LOG.warn("Failed to read assignment. This should only happen when topology is shutting down.", e);
         }
 
+        suicideIfLocalAssignmentsChanged(assignment);
         Set<NodeInfo> neededConnections = new HashSet<>();
         Map<Integer, NodeInfo> newTaskToNodePort = new HashMap<>();
         if (null != assignment) {
@@ -631,13 +649,11 @@
         return this.autoCredentials;
     }
 
-    private List<List<Long>> readWorkerExecutors(IStormClusterState stormClusterState, String topologyId, String assignmentId,
-                                                 int port) {
-        LOG.info("Reading assignments");
+    private List<List<Long>> readWorkerExecutors(String assignmentId, int port, Assignment assignment) {
         List<List<Long>> executorsAssignedToThisWorker = new ArrayList<>();
         executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID);
         Map<List<Long>, NodeInfo> executorToNodePort = 
-            getLocalAssignment(stormClusterState, topologyId).get_executor_node_port();
+            assignment.get_executor_node_port();
         for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
             NodeInfo nodeInfo = entry.getValue();
             if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) {
diff --git a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
index 2a6d561..af2d981 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
@@ -97,7 +97,14 @@
             }
             for (Utils.SmartThread t : threads) {
                 LOG.debug("Executor " + executor.getComponentId() + ":" + executor.getExecutorId() + " joining thread " + t.getName());
-                t.join();
+                //Don't wait forever.
+                //This is to avoid the deadlock between the executor thread (t) and the shutdown hook (which invokes Worker::shutdown)
+                //when it is the executor thread (t) who invokes the shutdown hook. See STORM-3658.
+                long waitMs = 100;
+                t.join(waitMs);
+                if (t.isAlive()) {
+                    LOG.warn("Thread {} is still alive ({} ms after interruption). Stop waiting for it.", t.getName(), waitMs);
+                }
             }
             executor.getStats().cleanupStats();
             for (Task task : taskDatas) {
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index fec30cc..aca356a 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -348,13 +348,13 @@
             } catch (Exception e) {
                 LOG.warn("Exception in the ShutDownHook", e);
             }
-        });
+        }, "ShutdownHook-sleepKill-" + numSecs + "s");
         sleepKill.setDaemon(true);
-        Thread wrappedFunc = new Thread(() -> {
+        Thread shutdownFunc = new Thread(() -> {
             func.run();
             sleepKill.interrupt();
-        });
-        Runtime.getRuntime().addShutdownHook(wrappedFunc);
+        }, "ShutdownHook-shutdownFunc");
+        Runtime.getRuntime().addShutdownHook(shutdownFunc);
         Runtime.getRuntime().addShutdownHook(sleepKill);
     }
 
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index e14fb29..b9b162c 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -2314,6 +2314,11 @@
     }
 
     private boolean supportRpcHeartbeat(TopologyDetails topo) {
+        if (stormClusterState.isPacemakerStateStore()) {
+            // While using PacemakerStateStorage, ignore RPC heartbeat.
+            return false;
+        }
+
         if (!topo.getTopology().is_set_storm_version()) {
             // current version supports RPC heartbeat
             return true;
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
index 75d54be..b33e5a8 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
@@ -87,6 +87,10 @@
             // error/exception thrown, just skip
             return;
         }
+        if (supervisor.getStormClusterState().isPacemakerStateStore()) {
+            LOG.debug("Worker are using pacemaker to send worker heartbeats so skip reporting by supervisor.");
+            return;
+        }
         // if it is local mode, just get the local nimbus instance and set the heartbeats
         if (ConfigUtils.isLocalMode(conf)) {
             try {