[SPARK-48169][SQL] Use lazy BadRecordException cause in all parsers and remove the old constructor, which was meant for the migration
### What changes were proposed in this pull request?
Use factory function for the exception cause in `BadRecordException` to avoid constructing heavy exceptions in the underlying parser. Now they are constructed on-demand in `FailureSafeParser`. A follow-up for https://github.com/apache/spark/pull/46400
### Why are the changes needed?
- Speed-up `JacksonParser` and `StaxXmlParser`, since they throw user-facing exceptions to `FailureSafeParser`
- Refactoring - leave only one constructor in `BadRecordException`
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- `testOnly org.apache.spark.sql.catalyst.json.JacksonParserSuite`
- `testOnly org.apache.spark.sql.catalyst.csv.UnivocityParserSuite`
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #46438 from vladimirg-db/vladimirg-db/use-lazy-exception-cause-in-all-bad-record-exception-invocations.
Authored-by: Vladimir Golubev <vladimir.golubev@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 37d9143..8d06789 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -359,7 +359,7 @@
} else {
if (badRecordException.isDefined) {
throw BadRecordException(
- () => currentInput, () => Array[InternalRow](requiredRow.get), badRecordException.get)
+ () => currentInput, () => Array(requiredRow.get), badRecordException.get)
} else {
requiredRow
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index d1093a3..3c42f72 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -613,7 +613,7 @@
// JSON parser currently doesn't support partial results for corrupted records.
// For such records, all fields other than the field configured by
// `columnNameOfCorruptRecord` are set to `null`.
- throw BadRecordException(() => recordLiteral(record), cause = e)
+ throw BadRecordException(() => recordLiteral(record), cause = () => e)
case e: CharConversionException if options.encoding.isEmpty =>
val msg =
"""JSON parser cannot handle a character in its input.
@@ -621,17 +621,17 @@
|""".stripMargin + e.getMessage
val wrappedCharException = new CharConversionException(msg)
wrappedCharException.initCause(e)
- throw BadRecordException(() => recordLiteral(record), cause = wrappedCharException)
+ throw BadRecordException(() => recordLiteral(record), cause = () => wrappedCharException)
case PartialResultException(row, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
partialResults = () => Array(row),
- convertCauseForPartialResult(cause))
+ cause = () => convertCauseForPartialResult(cause))
case PartialResultArrayException(rows, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
partialResults = () => rows,
- cause)
+ cause = () => cause)
// These exceptions should never be thrown outside of JacksonParser.
// They are used for the control flow in the parser. We add them here for completeness
// since they also indicate a bad record.
@@ -639,12 +639,12 @@
throw BadRecordException(
record = () => recordLiteral(record),
partialResults = () => Array(InternalRow(arrayData)),
- convertCauseForPartialResult(cause))
+ cause = () => convertCauseForPartialResult(cause))
case PartialMapDataResultException(mapData, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
partialResults = () => Array(InternalRow(mapData)),
- convertCauseForPartialResult(cause))
+ cause = () => convertCauseForPartialResult(cause))
}
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
index 84f183a..c4fcdf4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
@@ -77,7 +77,7 @@
*/
case class BadRecordException(
@transient record: () => UTF8String,
- @transient partialResults: () => Array[InternalRow],
+ @transient partialResults: () => Array[InternalRow] = () => Array.empty[InternalRow],
@transient cause: () => Throwable)
extends Exception() {
@@ -85,14 +85,6 @@
override def fillInStackTrace(): Throwable = this
}
-object BadRecordException {
- def apply(
- record: () => UTF8String,
- partialResults: () => Array[InternalRow] = () => Array.empty[InternalRow],
- cause: Throwable): BadRecordException =
- new BadRecordException(record, partialResults, () => cause)
-}
-
/**
* Exception thrown when the underlying parser parses a JSON array as a struct.
*/
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
index 2b30fe2..2b237ab 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
@@ -148,25 +148,27 @@
// XML parser currently doesn't support partial results for corrupted records.
// For such records, all fields other than the field configured by
// `columnNameOfCorruptRecord` are set to `null`.
- throw BadRecordException(() => xmlRecord, cause = e)
+ throw BadRecordException(() => xmlRecord, cause = () => e)
case e: CharConversionException if options.charset.isEmpty =>
- val msg =
- """XML parser cannot handle a character in its input.
- |Specifying encoding as an input option explicitly might help to resolve the issue.
- |""".stripMargin + e.getMessage
- val wrappedCharException = new CharConversionException(msg)
- wrappedCharException.initCause(e)
- throw BadRecordException(() => xmlRecord, cause = wrappedCharException)
+ throw BadRecordException(() => xmlRecord, cause = () => {
+ val msg =
+ """XML parser cannot handle a character in its input.
+ |Specifying encoding as an input option explicitly might help to resolve the issue.
+ |""".stripMargin + e.getMessage
+ val wrappedCharException = new CharConversionException(msg)
+ wrappedCharException.initCause(e)
+ wrappedCharException
+ })
case PartialResultException(row, cause) =>
throw BadRecordException(
record = () => xmlRecord,
partialResults = () => Array(row),
- cause)
+ () => cause)
case PartialResultArrayException(rows, cause) =>
throw BadRecordException(
record = () => xmlRecord,
partialResults = () => rows,
- cause)
+ () => cause)
}
}