Merge pull request #3300 from bipinprasad/storm3659
[STORM-3659] Add StormVersion to OwnerTopologies table on User page
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 93c5499..c9cc554 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1676,10 +1676,9 @@
@IsPositiveNumber
public static final String EXECUTOR_METRICS_FREQUENCY_SECS = "executor.metrics.frequency.secs";
/**
- * How often a task should heartbeat its status to the master, deprecated for 2.0 RPC heartbeat reporting, see {@code
+ * How often a task should heartbeat its status to the Pacamker. For 2.0 RPC heartbeat reporting, see {@code
* EXECUTOR_METRICS_FREQUENCY_SECS }.
*/
- @Deprecated
@IsInteger
@IsPositiveNumber
public static final String TASK_HEARTBEAT_FREQUENCY_SECS = "task.heartbeat.frequency.secs";
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index 94f0c86..412c761 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -75,6 +75,13 @@
boolean isAssignmentsBackendSynchronized();
/**
+ * Flag to indicate if the Pacameker is backend store.
+ *
+ * @return true if Pacemaker is being used as StateStore
+ */
+ boolean isPacemakerStateStore();
+
+ /**
* Mark the assignments as synced successfully, see {@link #isAssignmentsBackendSynchronized()}.
*/
void setAssignmentsBackendSynchronized();
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 80d5fb0..99dd98b 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -229,6 +229,14 @@
}
@Override
+ public boolean isPacemakerStateStore() {
+ if (stateStorage == null) {
+ return false;
+ }
+ return stateStorage instanceof PaceMakerStateStorage;
+ }
+
+ @Override
public void setAssignmentsBackendSynchronized() {
this.assignmentsBackend.setSynchronized();
}
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 989d464..b0075d9 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -152,10 +152,14 @@
StormCommon.validateDistributedMode(conf);
int supervisorPortInt = Integer.parseInt(supervisorPort);
Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt, Integer.parseInt(portStr), workerId);
- worker.start();
+
+ //Add shutdown hooks before starting any other threads to avoid possible race condition
+ //between invoking shutdown hooks and registering shutdown hooks. See STORM-3658.
int workerShutdownSleepSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
LOG.info("Adding shutdown hook with kill in {} secs", workerShutdownSleepSecs);
Utils.addShutdownHookWithDelayedForceKill(worker::shutdown, workerShutdownSleepSecs);
+
+ worker.start();
}
public void start() throws Exception {
@@ -219,8 +223,11 @@
}
});
+ Integer execHeartBeatFreqSecs = workerState.stormClusterState.isPacemakerStateStore()
+ ? (Integer) conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS)
+ : (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS);
workerState.executorHeartbeatTimer
- .scheduleRecurring(0, (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS),
+ .scheduleRecurring(0, execHeartBeatFreqSecs,
Worker.this::doExecutorHeartbeats);
workerState.refreshConnections();
@@ -365,7 +372,10 @@
state.setWorkerHeartBeat(lsWorkerHeartbeat);
state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up.
// it shouldn't take supervisor 120 seconds between listing dir and reading it
- heartbeatToMasterIfLocalbeatFail(lsWorkerHeartbeat);
+ if (!workerState.stormClusterState.isPacemakerStateStore()) {
+ LOG.debug("The pacemaker is not used, send heartbeat to master.");
+ heartbeatToMasterIfLocalbeatFail(lsWorkerHeartbeat);
+ }
this.heatbeatMeter.mark();
}
@@ -447,6 +457,7 @@
if (ConfigUtils.isLocalMode(this.conf)) {
return;
}
+
//In distributed mode, send heartbeat directly to master if local supervisor goes down.
SupervisorWorkerHeartbeat workerHeartbeat = new SupervisorWorkerHeartbeat(lsWorkerHeartbeat.get_topology_id(),
lsWorkerHeartbeat.get_executors(),
@@ -470,54 +481,59 @@
try {
LOG.info("Shutting down worker {} {} {}", topologyId, assignmentId, port);
- for (IConnection socket : workerState.cachedNodeToPortSocket.get().values()) {
- //this will do best effort flushing since the linger period
- // was set on creation
- socket.close();
- }
- LOG.info("Terminating messaging context");
- LOG.info("Shutting down executors");
- for (IRunningExecutor executor : executorsAtom.get()) {
- ((ExecutorShutdown) executor).shutdown();
- }
- LOG.info("Shut down executors");
+ if (workerState != null) {
+ for (IConnection socket : workerState.cachedNodeToPortSocket.get().values()) {
+ //this will do best effort flushing since the linger period
+ // was set on creation
+ socket.close();
+ }
+ LOG.info("Terminating messaging context");
+ LOG.info("Shutting down executors");
+ for (IRunningExecutor executor : executorsAtom.get()) {
+ ((ExecutorShutdown) executor).shutdown();
+ }
+ LOG.info("Shut down executors");
- LOG.info("Shutting down transfer thread");
- workerState.haltWorkerTransfer();
+ LOG.info("Shutting down transfer thread");
+ workerState.haltWorkerTransfer();
- if (transferThread != null) {
- transferThread.interrupt();
- transferThread.join();
- LOG.info("Shut down transfer thread");
+ if (transferThread != null) {
+ transferThread.interrupt();
+ transferThread.join();
+ LOG.info("Shut down transfer thread");
+ }
+
+ workerState.heartbeatTimer.close();
+ workerState.refreshConnectionsTimer.close();
+ workerState.refreshCredentialsTimer.close();
+ workerState.checkForUpdatedBlobsTimer.close();
+ workerState.refreshActiveTimer.close();
+ workerState.executorHeartbeatTimer.close();
+ workerState.userTimer.close();
+ workerState.refreshLoadTimer.close();
+ workerState.resetLogLevelsTimer.close();
+ workerState.flushTupleTimer.close();
+ workerState.backPressureCheckTimer.close();
+
+ // this is fine because the only time this is shared is when it's a local context,
+ // in which case it's a noop
+ workerState.mqContext.term();
+
+ workerState.closeResources();
+
+ LOG.info("Trigger any worker shutdown hooks");
+ workerState.runWorkerShutdownHooks();
+
+ workerState.stormClusterState.removeWorkerHeartbeat(topologyId, assignmentId, (long) port);
+ LOG.info("Disconnecting from storm cluster state context");
+ workerState.stormClusterState.disconnect();
+ workerState.stateStorage.close();
+ } else {
+ LOG.error("workerState is null");
}
- workerState.heartbeatTimer.close();
- workerState.refreshConnectionsTimer.close();
- workerState.refreshCredentialsTimer.close();
- workerState.checkForUpdatedBlobsTimer.close();
- workerState.refreshActiveTimer.close();
- workerState.executorHeartbeatTimer.close();
- workerState.userTimer.close();
- workerState.refreshLoadTimer.close();
- workerState.resetLogLevelsTimer.close();
- workerState.flushTupleTimer.close();
- workerState.backPressureCheckTimer.close();
-
- // this is fine because the only time this is shared is when it's a local context,
- // in which case it's a noop
- workerState.mqContext.term();
-
- workerState.closeResources();
-
metricRegistry.stop();
- LOG.info("Trigger any worker shutdown hooks");
- workerState.runWorkerShutdownHooks();
-
- workerState.stormClusterState.removeWorkerHeartbeat(topologyId, assignmentId, (long) port);
- LOG.info("Disconnecting from storm cluster state context");
- workerState.stormClusterState.disconnect();
- workerState.stateStorage.close();
LOG.info("Shut down worker {} {} {}", topologyId, assignmentId, port);
} catch (Exception ex) {
throw Utils.wrapInRuntime(ex);
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index db0c0e5..8e5af6a 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -168,7 +168,6 @@
this.autoCredentials = autoCredentials;
this.conf = conf;
this.supervisorIfaceSupplier = supervisorIfaceSupplier;
- this.localExecutors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port));
this.mqContext = (null != mqContext) ? mqContext : TransportFactory.makeContext(topologyConf);
this.topologyId = topologyId;
this.assignmentId = assignmentId;
@@ -176,6 +175,8 @@
this.workerId = workerId;
this.stateStorage = stateStorage;
this.stormClusterState = stormClusterState;
+ this.localExecutors =
+ new HashSet<>(readWorkerExecutors(assignmentId, port, getLocalAssignment(this.stormClusterState, topologyId)));
this.isWorkerActive = new CountDownLatch(1);
this.isTopologyActive = new AtomicBoolean(false);
this.stormComponentToDebug = new AtomicReference<>();
@@ -374,7 +375,23 @@
public SmartThread makeTransferThread() {
return workerTransfer.makeTransferThread();
}
-
+
+ public void suicideIfLocalAssignmentsChanged(Assignment assignment) {
+ if (assignment != null) {
+ Set<List<Long>> assignedExecutors = new HashSet<>(readWorkerExecutors(assignmentId, port, assignment));
+ if (!localExecutors.equals(assignedExecutors)) {
+ LOG.info("Found conflicting assignments. We shouldn't be alive!"
+ + " Assigned: " + assignedExecutors + ", Current: "
+ + localExecutors);
+ if (!ConfigUtils.isLocalMode(conf)) {
+ suicideCallback.run();
+ } else {
+ LOG.info("Local worker tried to commit suicide!");
+ }
+ }
+ }
+ }
+
public void refreshConnections() {
Assignment assignment = null;
try {
@@ -383,6 +400,7 @@
LOG.warn("Failed to read assignment. This should only happen when topology is shutting down.", e);
}
+ suicideIfLocalAssignmentsChanged(assignment);
Set<NodeInfo> neededConnections = new HashSet<>();
Map<Integer, NodeInfo> newTaskToNodePort = new HashMap<>();
if (null != assignment) {
@@ -631,13 +649,11 @@
return this.autoCredentials;
}
- private List<List<Long>> readWorkerExecutors(IStormClusterState stormClusterState, String topologyId, String assignmentId,
- int port) {
- LOG.info("Reading assignments");
+ private List<List<Long>> readWorkerExecutors(String assignmentId, int port, Assignment assignment) {
List<List<Long>> executorsAssignedToThisWorker = new ArrayList<>();
executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID);
Map<List<Long>, NodeInfo> executorToNodePort =
- getLocalAssignment(stormClusterState, topologyId).get_executor_node_port();
+ assignment.get_executor_node_port();
for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
NodeInfo nodeInfo = entry.getValue();
if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) {
diff --git a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
index 2a6d561..af2d981 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
@@ -97,7 +97,14 @@
}
for (Utils.SmartThread t : threads) {
LOG.debug("Executor " + executor.getComponentId() + ":" + executor.getExecutorId() + " joining thread " + t.getName());
- t.join();
+ //Don't wait forever.
+ //This is to avoid the deadlock between the executor thread (t) and the shutdown hook (which invokes Worker::shutdown)
+ //when it is the executor thread (t) who invokes the shutdown hook. See STORM-3658.
+ long waitMs = 100;
+ t.join(waitMs);
+ if (t.isAlive()) {
+ LOG.warn("Thread {} is still alive ({} ms after interruption). Stop waiting for it.", t.getName(), waitMs);
+ }
}
executor.getStats().cleanupStats();
for (Task task : taskDatas) {
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index fec30cc..aca356a 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -348,13 +348,13 @@
} catch (Exception e) {
LOG.warn("Exception in the ShutDownHook", e);
}
- });
+ }, "ShutdownHook-sleepKill-" + numSecs + "s");
sleepKill.setDaemon(true);
- Thread wrappedFunc = new Thread(() -> {
+ Thread shutdownFunc = new Thread(() -> {
func.run();
sleepKill.interrupt();
- });
- Runtime.getRuntime().addShutdownHook(wrappedFunc);
+ }, "ShutdownHook-shutdownFunc");
+ Runtime.getRuntime().addShutdownHook(shutdownFunc);
Runtime.getRuntime().addShutdownHook(sleepKill);
}
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index e14fb29..b9b162c 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -2314,6 +2314,11 @@
}
private boolean supportRpcHeartbeat(TopologyDetails topo) {
+ if (stormClusterState.isPacemakerStateStore()) {
+ // While using PacemakerStateStorage, ignore RPC heartbeat.
+ return false;
+ }
+
if (!topo.getTopology().is_set_storm_version()) {
// current version supports RPC heartbeat
return true;
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
index 75d54be..b33e5a8 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
@@ -87,6 +87,10 @@
// error/exception thrown, just skip
return;
}
+ if (supervisor.getStormClusterState().isPacemakerStateStore()) {
+ LOG.debug("Worker are using pacemaker to send worker heartbeats so skip reporting by supervisor.");
+ return;
+ }
// if it is local mode, just get the local nimbus instance and set the heartbeats
if (ConfigUtils.isLocalMode(conf)) {
try {