STORM-3662: Use Pacemaker if cluster has set up uses it (#3297)
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 93c5499..c9cc554 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1676,10 +1676,9 @@
@IsPositiveNumber
public static final String EXECUTOR_METRICS_FREQUENCY_SECS = "executor.metrics.frequency.secs";
/**
- * How often a task should heartbeat its status to the master, deprecated for 2.0 RPC heartbeat reporting, see {@code
+ * How often a task should heartbeat its status to the Pacamker. For 2.0 RPC heartbeat reporting, see {@code
* EXECUTOR_METRICS_FREQUENCY_SECS }.
*/
- @Deprecated
@IsInteger
@IsPositiveNumber
public static final String TASK_HEARTBEAT_FREQUENCY_SECS = "task.heartbeat.frequency.secs";
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index 94f0c86..412c761 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -75,6 +75,13 @@
boolean isAssignmentsBackendSynchronized();
/**
+ * Flag to indicate if the Pacameker is backend store.
+ *
+ * @return true if Pacemaker is being used as StateStore
+ */
+ boolean isPacemakerStateStore();
+
+ /**
* Mark the assignments as synced successfully, see {@link #isAssignmentsBackendSynchronized()}.
*/
void setAssignmentsBackendSynchronized();
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 80d5fb0..99dd98b 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -229,6 +229,14 @@
}
@Override
+ public boolean isPacemakerStateStore() {
+ if (stateStorage == null) {
+ return false;
+ }
+ return stateStorage instanceof PaceMakerStateStorage;
+ }
+
+ @Override
public void setAssignmentsBackendSynchronized() {
this.assignmentsBackend.setSynchronized();
}
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 989d464..f703eda 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -219,8 +219,11 @@
}
});
+ Integer execHeartBeatFreqSecs = workerState.stormClusterState.isPacemakerStateStore()
+ ? (Integer) conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS)
+ : (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS);
workerState.executorHeartbeatTimer
- .scheduleRecurring(0, (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS),
+ .scheduleRecurring(0, execHeartBeatFreqSecs,
Worker.this::doExecutorHeartbeats);
workerState.refreshConnections();
@@ -365,7 +368,10 @@
state.setWorkerHeartBeat(lsWorkerHeartbeat);
state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up.
// it shouldn't take supervisor 120 seconds between listing dir and reading it
- heartbeatToMasterIfLocalbeatFail(lsWorkerHeartbeat);
+ if (!workerState.stormClusterState.isPacemakerStateStore()) {
+ LOG.debug("The pacemaker is not used, send heartbeat to master.");
+ heartbeatToMasterIfLocalbeatFail(lsWorkerHeartbeat);
+ }
this.heatbeatMeter.mark();
}
@@ -447,6 +453,7 @@
if (ConfigUtils.isLocalMode(this.conf)) {
return;
}
+
//In distributed mode, send heartbeat directly to master if local supervisor goes down.
SupervisorWorkerHeartbeat workerHeartbeat = new SupervisorWorkerHeartbeat(lsWorkerHeartbeat.get_topology_id(),
lsWorkerHeartbeat.get_executors(),
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index e14fb29..b9b162c 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -2314,6 +2314,11 @@
}
private boolean supportRpcHeartbeat(TopologyDetails topo) {
+ if (stormClusterState.isPacemakerStateStore()) {
+ // While using PacemakerStateStorage, ignore RPC heartbeat.
+ return false;
+ }
+
if (!topo.getTopology().is_set_storm_version()) {
// current version supports RPC heartbeat
return true;
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
index 75d54be..b33e5a8 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
@@ -87,6 +87,10 @@
// error/exception thrown, just skip
return;
}
+ if (supervisor.getStormClusterState().isPacemakerStateStore()) {
+ LOG.debug("Worker are using pacemaker to send worker heartbeats so skip reporting by supervisor.");
+ return;
+ }
// if it is local mode, just get the local nimbus instance and set the heartbeats
if (ConfigUtils.isLocalMode(conf)) {
try {