[SPARK-47833][SQL][CORE] Supply caller stackstrace for checkAndGlobPathIfNecessary AnalysisException
### What changes were proposed in this pull request?
SPARK-29089 parallelized `checkAndGlobPathIfNecessary` by leveraging ForkJoinPool, it also introduced a side effect, if something goes wrong, the reported error message loses caller side stack trace.
For example, I meet the following error on a Spark job, I have no idea what happened without the caller stack trace.
```
2024-04-12 14:31:21 CST ApplicationMaster INFO - Final app status: FAILED, exitCode: 15, (reason: User class threw exception: org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://xyz-cluster/user/abc/hive_db/tmp.db/tmp_lskkh_1
at org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1011)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:785)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:782)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
)
```
### Why are the changes needed?
Improve error message.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New UT is added, and the exception stacktrace differences are
raw stacktrace
```
java.lang.RuntimeException: Error occurred on Thread-9
at org.apache.spark.util.ThreadUtilsSuite$$anon$3.internalMethod(ThreadUtilsSuite.scala:141)
at org.apache.spark.util.ThreadUtilsSuite$$anon$3.run(ThreadUtilsSuite.scala:138)
```
enhanced exception stacktrace
```
java.lang.RuntimeException: Error occurred on Thread-9
at org.apache.spark.util.ThreadUtilsSuite$$anon$3.internalMethod(ThreadUtilsSuite.scala:141)
at org.apache.spark.util.ThreadUtilsSuite$$anon$3.run(ThreadUtilsSuite.scala:138)
at ... run in separate thread: Thread-9 ... ()
at org.apache.spark.util.ThreadUtilsSuite.$anonfun$new$16(ThreadUtilsSuite.scala:151)
at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
(... other scalatest callsites)
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #46028 from pan3793/SPARK-47833.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index e4167c4..7f61b3f 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -231,7 +231,7 @@
/**
* Run a piece of code in a new thread and return the result. Exception in the new thread is
* thrown in the caller thread with an adjusted stack trace that removes references to this
- * method for clarity. The exception stack traces will be like the following
+ * method for clarity. The exception stack traces will be like the following:
*
* SomeException: exception-message
* at CallerClass.body-method (sourcefile.scala)
@@ -261,32 +261,52 @@
exception match {
case Some(realException) =>
- // Remove the part of the stack that shows method calls into this helper method
- // This means drop everything from the top until the stack element
- // ThreadUtils.runInNewThread(), and then drop that as well (hence the `drop(1)`).
- val baseStackTrace = Thread.currentThread().getStackTrace().dropWhile(
- ! _.getClassName.contains(this.getClass.getSimpleName)).drop(1)
-
- // Remove the part of the new thread stack that shows methods call from this helper method
- val extraStackTrace = realException.getStackTrace.takeWhile(
- ! _.getClassName.contains(this.getClass.getSimpleName))
-
- // Combine the two stack traces, with a place holder just specifying that there
- // was a helper method used, without any further details of the helper
- val placeHolderStackElem = new StackTraceElement(
- s"... run in separate thread using ${ThreadUtils.getClass.getName.stripSuffix("$")} ..",
- " ", "", -1)
- val finalStackTrace = extraStackTrace ++ Seq(placeHolderStackElem) ++ baseStackTrace
-
- // Update the stack trace and rethrow the exception in the caller thread
- realException.setStackTrace(finalStackTrace)
- throw realException
+ throw wrapCallerStacktrace(realException, dropStacks = 2)
case None =>
result
}
}
/**
+ * Adjust exception stack stace to wrap with caller side thread stack trace.
+ * The exception stack traces will be like the following:
+ *
+ * SomeException: exception-message
+ * at CallerClass.body-method (sourcefile.scala)
+ * at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
+ * at CallerClass.caller-method (sourcefile.scala)
+ * ...
+ */
+ def wrapCallerStacktrace[T <: Throwable](
+ realException: T,
+ combineMessage: String =
+ s"run in separate thread using ${ThreadUtils.getClass.getName.stripSuffix("$")}",
+ dropStacks: Int = 1): T = {
+ require(dropStacks >= 0, "dropStacks must be zero or positive")
+ val simpleName = this.getClass.getSimpleName
+ // Remove the part of the stack that shows method calls into this helper method
+ // This means drop everything from the top until the stack element
+ // ThreadUtils.wrapCallerStack(), and then drop that as well (hence the `drop(1)`).
+ // Large dropStacks allows caller to drop more stacks.
+ val baseStackTrace = Thread.currentThread().getStackTrace
+ .dropWhile(!_.getClassName.contains(simpleName))
+ .drop(dropStacks)
+
+ // Remove the part of the new thread stack that shows methods call from this helper method
+ val extraStackTrace = realException.getStackTrace
+ .takeWhile(!_.getClassName.contains(simpleName))
+
+ // Combine the two stack traces, with a place holder just specifying that there
+ // was a helper method used, without any further details of the helper
+ val placeHolderStackElem = new StackTraceElement(s"... $combineMessage ..", " ", "", -1)
+ val finalStackTrace = extraStackTrace ++ Seq(placeHolderStackElem) ++ baseStackTrace
+
+ // Update the stack trace and rethrow the exception in the caller thread
+ realException.setStackTrace(finalStackTrace)
+ realException
+ }
+
+ /**
* Construct a new ForkJoinPool with a specified max parallelism and name prefix.
*/
def newForkJoinPool(prefix: String, maxThreadNumber: Int): ForkJoinPool = {
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index d907fe1..04f661d 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -119,11 +119,46 @@
runInNewThread("thread-name") { throw new IllegalArgumentException(uniqueExceptionMessage) }
}
assert(exception.getMessage === uniqueExceptionMessage)
- assert(exception.getStackTrace.mkString("\n").contains(
+ val stacktrace = exception.getStackTrace.mkString("\n")
+ assert(stacktrace.contains(
"... run in separate thread using org.apache.spark.util.ThreadUtils ..."),
"stack trace does not contain expected place holder"
)
- assert(exception.getStackTrace.mkString("\n").contains("ThreadUtils.scala") === false,
+ assert(!stacktrace.contains("ThreadUtils.scala"),
+ "stack trace contains unexpected references to ThreadUtils"
+ )
+ }
+
+ test("SPARK-47833: wrapCallerStacktrace") {
+ var runnerThreadName: String = null
+ var exception: Throwable = null
+ val t = new Thread() {
+ override def run(): Unit = {
+ runnerThreadName = Thread.currentThread().getName
+ internalMethod()
+ }
+ private def internalMethod(): Unit = {
+ throw new RuntimeException(s"Error occurred on $runnerThreadName")
+ }
+ }
+ t.setDaemon(true)
+ t.setUncaughtExceptionHandler { case (_, e) => exception = e }
+ t.start()
+ t.join()
+
+ ThreadUtils.wrapCallerStacktrace(exception, s"run in separate thread: $runnerThreadName")
+
+ val stacktrace = exception.getStackTrace.mkString("\n")
+ assert(stacktrace.contains("internalMethod"),
+ "stack trace does not contain real exception stack trace"
+ )
+ assert(stacktrace.contains(s"... run in separate thread: $runnerThreadName ..."),
+ "stack trace does not contain expected place holder"
+ )
+ assert(stacktrace.contains("org.scalatest.Suite.run"),
+ "stack trace does not contain caller stack trace"
+ )
+ assert(!stacktrace.contains("ThreadUtils.scala"),
"stack trace contains unexpected references to ThreadUtils"
)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 84143a3..1936167 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -786,7 +786,7 @@
globResult
}.flatten
} catch {
- case e: SparkException => throw e.getCause
+ case e: SparkException => throw ThreadUtils.wrapCallerStacktrace(e.getCause)
}
if (checkFilesExist) {
@@ -798,7 +798,7 @@
}
}
} catch {
- case e: SparkException => throw e.getCause
+ case e: SparkException => throw ThreadUtils.wrapCallerStacktrace(e.getCause)
}
}