[STORM-3658] Avoid deadlock and race condition caused by shutdown hooks (#3299)
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index f703eda..b0075d9 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -152,10 +152,14 @@
StormCommon.validateDistributedMode(conf);
int supervisorPortInt = Integer.parseInt(supervisorPort);
Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt, Integer.parseInt(portStr), workerId);
- worker.start();
+
+ //Add shutdown hooks before starting any other threads to avoid possible race condition
+ //between invoking shutdown hooks and registering shutdown hooks. See STORM-3658.
int workerShutdownSleepSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
LOG.info("Adding shutdown hook with kill in {} secs", workerShutdownSleepSecs);
Utils.addShutdownHookWithDelayedForceKill(worker::shutdown, workerShutdownSleepSecs);
+
+ worker.start();
}
public void start() throws Exception {
@@ -477,54 +481,59 @@
try {
LOG.info("Shutting down worker {} {} {}", topologyId, assignmentId, port);
- for (IConnection socket : workerState.cachedNodeToPortSocket.get().values()) {
- //this will do best effort flushing since the linger period
- // was set on creation
- socket.close();
- }
- LOG.info("Terminating messaging context");
- LOG.info("Shutting down executors");
- for (IRunningExecutor executor : executorsAtom.get()) {
- ((ExecutorShutdown) executor).shutdown();
- }
- LOG.info("Shut down executors");
+ if (workerState != null) {
+ for (IConnection socket : workerState.cachedNodeToPortSocket.get().values()) {
+ //this will do best effort flushing since the linger period
+ // was set on creation
+ socket.close();
+ }
+ LOG.info("Terminating messaging context");
+ LOG.info("Shutting down executors");
+ for (IRunningExecutor executor : executorsAtom.get()) {
+ ((ExecutorShutdown) executor).shutdown();
+ }
+ LOG.info("Shut down executors");
- LOG.info("Shutting down transfer thread");
- workerState.haltWorkerTransfer();
+ LOG.info("Shutting down transfer thread");
+ workerState.haltWorkerTransfer();
- if (transferThread != null) {
- transferThread.interrupt();
- transferThread.join();
- LOG.info("Shut down transfer thread");
+ if (transferThread != null) {
+ transferThread.interrupt();
+ transferThread.join();
+ LOG.info("Shut down transfer thread");
+ }
+
+ workerState.heartbeatTimer.close();
+ workerState.refreshConnectionsTimer.close();
+ workerState.refreshCredentialsTimer.close();
+ workerState.checkForUpdatedBlobsTimer.close();
+ workerState.refreshActiveTimer.close();
+ workerState.executorHeartbeatTimer.close();
+ workerState.userTimer.close();
+ workerState.refreshLoadTimer.close();
+ workerState.resetLogLevelsTimer.close();
+ workerState.flushTupleTimer.close();
+ workerState.backPressureCheckTimer.close();
+
+ // this is fine because the only time this is shared is when it's a local context,
+ // in which case it's a noop
+ workerState.mqContext.term();
+
+ workerState.closeResources();
+
+ LOG.info("Trigger any worker shutdown hooks");
+ workerState.runWorkerShutdownHooks();
+
+ workerState.stormClusterState.removeWorkerHeartbeat(topologyId, assignmentId, (long) port);
+ LOG.info("Disconnecting from storm cluster state context");
+ workerState.stormClusterState.disconnect();
+ workerState.stateStorage.close();
+ } else {
+ LOG.error("workerState is null");
}
- workerState.heartbeatTimer.close();
- workerState.refreshConnectionsTimer.close();
- workerState.refreshCredentialsTimer.close();
- workerState.checkForUpdatedBlobsTimer.close();
- workerState.refreshActiveTimer.close();
- workerState.executorHeartbeatTimer.close();
- workerState.userTimer.close();
- workerState.refreshLoadTimer.close();
- workerState.resetLogLevelsTimer.close();
- workerState.flushTupleTimer.close();
- workerState.backPressureCheckTimer.close();
-
- // this is fine because the only time this is shared is when it's a local context,
- // in which case it's a noop
- workerState.mqContext.term();
-
- workerState.closeResources();
-
metricRegistry.stop();
- LOG.info("Trigger any worker shutdown hooks");
- workerState.runWorkerShutdownHooks();
-
- workerState.stormClusterState.removeWorkerHeartbeat(topologyId, assignmentId, (long) port);
- LOG.info("Disconnecting from storm cluster state context");
- workerState.stormClusterState.disconnect();
- workerState.stateStorage.close();
LOG.info("Shut down worker {} {} {}", topologyId, assignmentId, port);
} catch (Exception ex) {
throw Utils.wrapInRuntime(ex);
diff --git a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
index 2a6d561..af2d981 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
@@ -97,7 +97,14 @@
}
for (Utils.SmartThread t : threads) {
LOG.debug("Executor " + executor.getComponentId() + ":" + executor.getExecutorId() + " joining thread " + t.getName());
- t.join();
+ //Don't wait forever.
+ //This is to avoid the deadlock between the executor thread (t) and the shutdown hook (which invokes Worker::shutdown)
+ //when it is the executor thread (t) who invokes the shutdown hook. See STORM-3658.
+ long waitMs = 100;
+ t.join(waitMs);
+ if (t.isAlive()) {
+ LOG.warn("Thread {} is still alive ({} ms after interruption). Stop waiting for it.", t.getName(), waitMs);
+ }
}
executor.getStats().cleanupStats();
for (Task task : taskDatas) {
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index fec30cc..aca356a 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -348,13 +348,13 @@
} catch (Exception e) {
LOG.warn("Exception in the ShutDownHook", e);
}
- });
+ }, "ShutdownHook-sleepKill-" + numSecs + "s");
sleepKill.setDaemon(true);
- Thread wrappedFunc = new Thread(() -> {
+ Thread shutdownFunc = new Thread(() -> {
func.run();
sleepKill.interrupt();
- });
- Runtime.getRuntime().addShutdownHook(wrappedFunc);
+ }, "ShutdownHook-shutdownFunc");
+ Runtime.getRuntime().addShutdownHook(shutdownFunc);
Runtime.getRuntime().addShutdownHook(sleepKill);
}