[SPARK-35714][CORE] Bug fix for deadlock during the executor shutdown
### What changes were proposed in this pull request?
Bug fix for deadlock during the executor shutdown
### Why are the changes needed?
When a executor received a TERM signal, it (the second TERM signal) will lock java.lang.Shutdown class and then call Shutdown.exit() method to exit the JVM.
Shutdown will call SparkShutdownHook to shutdown the executor.
During the executor shutdown phase, RemoteProcessDisconnected event will be send to the RPC inbox, and then WorkerWatcher will try to call System.exit(-1) again.
Because java.lang.Shutdown has already locked, a deadlock has occurred.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Test case "task reaper kills JVM if killed tasks keep running for too long" in JobCancellationSuite
Closes #32868 from wankunde/SPARK-35714.
Authored-by: Kun Wan <wankun@apache.org>
Signed-off-by: Sean Owen <srowen@gmail.com>
(cherry picked from commit 69aa7ad11f68e96e045b5eb915e21708e018421a)
Signed-off-by: Sean Owen <srowen@gmail.com>
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index 23efcab..43ec492 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -17,8 +17,13 @@
package org.apache.spark.deploy.worker
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
import org.apache.spark.internal.Logging
import org.apache.spark.rpc._
+import org.apache.spark.util.ThreadUtils
/**
* Endpoint which connects to a worker process and terminates the JVM if the
@@ -45,7 +50,14 @@
private val expectedAddress = RpcAddress.fromURIString(workerUrl)
private def isWorker(address: RpcAddress) = expectedAddress == address
- private def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1)
+ private def exitNonZero() =
+ if (isTesting) {
+ isShutDown = true
+ } else {
+ ThreadUtils.awaitResult(Future {
+ System.exit(-1)
+ }, 5.seconds)
+ }
override def receive: PartialFunction[Any, Unit] = {
case e => logWarning(s"Received unexpected message: $e")