[SPARK-48143][SQL] Use lightweight exceptions for control-flow between UnivocityParser and FailureSafeParser
# What changes were proposed in this pull request?
New lightweight exception for control-flow between UnivocityParser and FalureSafeParser to speed-up malformed CSV parsing.
This is a different way to implement these reverted changes: https://github.com/apache/spark/pull/46478
The previous implementation was more invasive - removing `cause` from `BadRecordException` could break upper code, which unwraps errors and checks the types of the causes. This implementation only touches `FailureSafeParser` and `UnivocityParser` since in the codebase they are always used together, unlike `JacksonParser` and `StaxXmlParser`. Removing stacktrace from `BadRecordException` is safe, since the cause itself has an adequate stacktrace (except pure control-flow cases).
### Why are the changes needed?
Parsing in `PermissiveMode` is slow due to heavy exception construction (stacktrace filling + string template substitution in `SparkRuntimeException`)
### Does this PR introduce _any_ user-facing change?
No, since `FailureSafeParser` unwraps `BadRecordException` and correctly rethrows user-facing exceptions in `FailFastMode`
### How was this patch tested?
- `testOnly org.apache.spark.sql.catalyst.csv.UnivocityParserSuite`
- Manually run csv benchmark
- Manually checked correct and malformed csv in sherk-shell (org.apache.spark.SparkException is thrown with the stacktrace)
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #46500 from vladimirg-db/vladimirg-db/use-special-lighweight-exception-for-control-flow-between-univocity-parser-and-failure-safe-parser.
Authored-by: Vladimir Golubev <vladimir.golubev@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index a5158d8..4d95097 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -316,7 +316,7 @@
throw BadRecordException(
() => getCurrentInput,
() => Array.empty,
- QueryExecutionErrors.malformedCSVRecordError(""))
+ LazyBadRecordCauseWrapper(() => QueryExecutionErrors.malformedCSVRecordError("")))
}
val currentInput = getCurrentInput
@@ -326,7 +326,8 @@
// However, we still have chance to parse some of the tokens. It continues to parses the
// tokens normally and sets null when `ArrayIndexOutOfBoundsException` occurs for missing
// tokens.
- Some(QueryExecutionErrors.malformedCSVRecordError(currentInput.toString))
+ Some(LazyBadRecordCauseWrapper(
+ () => QueryExecutionErrors.malformedCSVRecordError(currentInput.toString)))
} else None
// When the length of the returned tokens is identical to the length of the parsed schema,
// we just need to:
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
index 65a56c1..654b0b8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
@@ -67,16 +67,32 @@
extends Exception(cause)
/**
- * Exception thrown when the underlying parser meet a bad record and can't parse it.
+ * Exception thrown when the underlying parser met a bad record and can't parse it.
+ * The stacktrace is not collected for better preformance, and thus, this exception should
+ * not be used in a user-facing context.
* @param record a function to return the record that cause the parser to fail
* @param partialResults a function that returns an row array, which is the partial results of
* parsing this bad record.
- * @param cause the actual exception about why the record is bad and can't be parsed.
+ * @param cause the actual exception about why the record is bad and can't be parsed. It's better
+ * to use `LazyBadRecordCauseWrapper` here to delay heavy cause construction
+ * until it's needed.
*/
case class BadRecordException(
@transient record: () => UTF8String,
@transient partialResults: () => Array[InternalRow] = () => Array.empty[InternalRow],
- cause: Throwable) extends Exception(cause)
+ cause: Throwable) extends Exception(cause) {
+ override def getStackTrace(): Array[StackTraceElement] = new Array[StackTraceElement](0)
+ override def fillInStackTrace(): Throwable = this
+}
+
+/**
+ * Exception to use as `BadRecordException` cause to delay heavy user-facing exception construction.
+ * Does not contain stacktrace and used only for control flow
+ */
+case class LazyBadRecordCauseWrapper(cause: () => Throwable) extends Exception() {
+ override def getStackTrace(): Array[StackTraceElement] = new Array[StackTraceElement](0)
+ override def fillInStackTrace(): Throwable = this
+}
/**
* Exception thrown when the underlying parser parses a JSON array as a struct.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
index 10cd159..d9946d1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
@@ -78,10 +78,17 @@
case StringAsDataTypeException(fieldName, fieldValue, dataType) =>
throw QueryExecutionErrors.cannotParseStringAsDataTypeError(e.record().toString,
fieldName, fieldValue, dataType)
- case other => throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(
- toResultRow(e.partialResults().headOption, e.record).toString, other)
+ case causeWrapper: LazyBadRecordCauseWrapper =>
+ throwMalformedRecordsDetectedInRecordParsingError(e, causeWrapper.cause())
+ case cause => throwMalformedRecordsDetectedInRecordParsingError(e, cause)
}
}
}
}
+
+ private def throwMalformedRecordsDetectedInRecordParsingError(
+ e: BadRecordException, cause: Throwable): Nothing = {
+ throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(
+ toResultRow(e.partialResults().headOption, e.record).toString, cause)
+ }
}