STORM-3662: Use Pacemaker if cluster has set up uses it (#3297)

diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 93c5499..c9cc554 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1676,10 +1676,9 @@
     @IsPositiveNumber
     public static final String EXECUTOR_METRICS_FREQUENCY_SECS = "executor.metrics.frequency.secs";
     /**
-     * How often a task should heartbeat its status to the master, deprecated for 2.0 RPC heartbeat reporting, see {@code
+     * How often a task should heartbeat its status to the Pacamker. For 2.0 RPC heartbeat reporting, see {@code
      * EXECUTOR_METRICS_FREQUENCY_SECS }.
      */
-    @Deprecated
     @IsInteger
     @IsPositiveNumber
     public static final String TASK_HEARTBEAT_FREQUENCY_SECS = "task.heartbeat.frequency.secs";
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index 94f0c86..412c761 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -75,6 +75,13 @@
     boolean isAssignmentsBackendSynchronized();
 
     /**
+     * Flag to indicate if the Pacameker is backend store.
+     *
+     * @return true if Pacemaker is being used as StateStore
+     */
+    boolean isPacemakerStateStore();
+
+    /**
      * Mark the assignments as synced successfully, see {@link #isAssignmentsBackendSynchronized()}.
      */
     void setAssignmentsBackendSynchronized();
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 80d5fb0..99dd98b 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -229,6 +229,14 @@
     }
 
     @Override
+    public boolean isPacemakerStateStore() {
+        if (stateStorage == null) {
+            return false;
+        }
+        return stateStorage instanceof PaceMakerStateStorage;
+    }
+
+    @Override
     public void setAssignmentsBackendSynchronized() {
         this.assignmentsBackend.setSynchronized();
     }
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 989d464..f703eda 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -219,8 +219,11 @@
                 }
             });
 
+        Integer execHeartBeatFreqSecs = workerState.stormClusterState.isPacemakerStateStore()
+            ? (Integer) conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS)
+            : (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS);
         workerState.executorHeartbeatTimer
-            .scheduleRecurring(0, (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS),
+            .scheduleRecurring(0, execHeartBeatFreqSecs,
                                Worker.this::doExecutorHeartbeats);
 
         workerState.refreshConnections();
@@ -365,7 +368,10 @@
         state.setWorkerHeartBeat(lsWorkerHeartbeat);
         state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up.
         // it shouldn't take supervisor 120 seconds between listing dir and reading it
-        heartbeatToMasterIfLocalbeatFail(lsWorkerHeartbeat);
+        if (!workerState.stormClusterState.isPacemakerStateStore()) {
+            LOG.debug("The pacemaker is not used, send heartbeat to master.");
+            heartbeatToMasterIfLocalbeatFail(lsWorkerHeartbeat);
+        }
         this.heatbeatMeter.mark();
     }
 
@@ -447,6 +453,7 @@
         if (ConfigUtils.isLocalMode(this.conf)) {
             return;
         }
+
         //In distributed mode, send heartbeat directly to master if local supervisor goes down.
         SupervisorWorkerHeartbeat workerHeartbeat = new SupervisorWorkerHeartbeat(lsWorkerHeartbeat.get_topology_id(),
                                                                                   lsWorkerHeartbeat.get_executors(),
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index e14fb29..b9b162c 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -2314,6 +2314,11 @@
     }
 
     private boolean supportRpcHeartbeat(TopologyDetails topo) {
+        if (stormClusterState.isPacemakerStateStore()) {
+            // While using PacemakerStateStorage, ignore RPC heartbeat.
+            return false;
+        }
+
         if (!topo.getTopology().is_set_storm_version()) {
             // current version supports RPC heartbeat
             return true;
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
index 75d54be..b33e5a8 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
@@ -87,6 +87,10 @@
             // error/exception thrown, just skip
             return;
         }
+        if (supervisor.getStormClusterState().isPacemakerStateStore()) {
+            LOG.debug("Worker are using pacemaker to send worker heartbeats so skip reporting by supervisor.");
+            return;
+        }
         // if it is local mode, just get the local nimbus instance and set the heartbeats
         if (ConfigUtils.isLocalMode(conf)) {
             try {