blob: 3c42f72fa6b6c01ebd8602c04d9dcfbfcff0c0ff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.json
import java.io.{ByteArrayOutputStream, CharConversionException}
import java.nio.charset.MalformedInputException
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import com.fasterxml.jackson.core._
import org.apache.spark.SparkUpgradeException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.errors.{ExecutionErrors, QueryExecutionErrors}
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
import org.apache.spark.types.variant._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String, VariantVal}
import org.apache.spark.util.Utils
/**
* Constructs a parser for a given schema that translates a json string to an [[InternalRow]].
*/
class JacksonParser(
schema: DataType,
val options: JSONOptions,
allowArrayAsStructs: Boolean,
filters: Seq[Filter] = Seq.empty) extends Logging {
import JacksonUtils._
import com.fasterxml.jackson.core.JsonToken._
// A `ValueConverter` is responsible for converting a value from `JsonParser`
// to a value in a field for `InternalRow`.
private type ValueConverter = JsonParser => AnyRef
// `ValueConverter`s for the root schema for all fields in the schema
private val rootConverter = makeRootConverter(schema)
private val factory = options.buildJsonFactory()
private lazy val timestampFormatter = TimestampFormatter(
options.timestampFormatInRead,
options.zoneId,
options.locale,
legacyFormat = FAST_DATE_FORMAT,
isParsing = true)
private lazy val timestampNTZFormatter = TimestampFormatter(
options.timestampNTZFormatInRead,
options.zoneId,
legacyFormat = FAST_DATE_FORMAT,
isParsing = true,
forTimestampNTZ = true)
private lazy val dateFormatter = DateFormatter(
options.dateFormatInRead,
options.locale,
legacyFormat = FAST_DATE_FORMAT,
isParsing = true)
// Flags to signal if we need to fall back to the backward compatible behavior of parsing
// dates and timestamps.
// For more information, see comments for "enableDateTimeParsingFallback" option in JSONOptions.
private val enableParsingFallbackForTimestampType =
options.enableDateTimeParsingFallback
.orElse(SQLConf.get.jsonEnableDateTimeParsingFallback)
.getOrElse {
SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY ||
options.timestampFormatInRead.isEmpty
}
private val enableParsingFallbackForDateType =
options.enableDateTimeParsingFallback
.orElse(SQLConf.get.jsonEnableDateTimeParsingFallback)
.getOrElse {
SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY ||
options.dateFormatInRead.isEmpty
}
private val enablePartialResults = SQLConf.get.jsonEnablePartialResults
/**
* Create a converter which converts the JSON documents held by the `JsonParser`
* to a value according to a desired schema. This is a wrapper for the method
* `makeConverter()` to handle a row wrapped with an array.
*/
private def makeRootConverter(dt: DataType): JsonParser => Iterable[InternalRow] = {
dt match {
case _: StructType if options.singleVariantColumn.isDefined => (parser: JsonParser) => {
Some(InternalRow(parseVariant(parser)))
}
case st: StructType => makeStructRootConverter(st)
case mt: MapType => makeMapRootConverter(mt)
case at: ArrayType => makeArrayRootConverter(at)
}
}
protected final def parseVariant(parser: JsonParser): VariantVal = {
// Skips `FIELD_NAME` at the beginning. This check is adapted from `parseJsonToken`, but we
// cannot directly use the function here because it also handles the `VALUE_NULL` token and
// returns null (representing a SQL NULL). Instead, we want to return a variant null.
if (parser.getCurrentToken == FIELD_NAME) {
parser.nextToken()
}
try {
val v = VariantBuilder.parseJson(parser)
new VariantVal(v.getValue, v.getMetadata)
} catch {
case _: VariantSizeLimitException =>
throw QueryExecutionErrors.variantSizeLimitError(VariantUtil.SIZE_LIMIT, "JacksonParser")
}
}
private def makeStructRootConverter(st: StructType): JsonParser => Iterable[InternalRow] = {
val elementConverter = makeConverter(st)
val fieldConverters = st.map(_.dataType).map(makeConverter).toArray
val jsonFilters = if (SQLConf.get.jsonFilterPushDown) {
new JsonFilters(filters, st)
} else {
new NoopFilters
}
(parser: JsonParser) => parseJsonToken[Iterable[InternalRow]](parser, st) {
case START_OBJECT => convertObject(parser, st, fieldConverters, jsonFilters, isRoot = true)
// SPARK-3308: support reading top level JSON arrays and take every element
// in such an array as a row
//
// For example, we support, the JSON data as below:
//
// [{"a":"str_a_1"}]
// [{"a":"str_a_2"}, {"b":"str_b_3"}]
//
// resulting in:
//
// List([str_a_1,null])
// List([str_a_2,null], [null,str_b_3])
//
case START_ARRAY if allowArrayAsStructs =>
val array = convertArray(parser, elementConverter, isRoot = true, arrayAsStructs = true)
// Here, as we support reading top level JSON arrays and take every element
// in such an array as a row, this case is possible.
if (array.numElements() == 0) {
Array.empty[InternalRow]
} else {
array.toArray[InternalRow](schema)
}
case START_ARRAY =>
throw JsonArraysAsStructsException()
}
}
private def makeMapRootConverter(mt: MapType): JsonParser => Iterable[InternalRow] = {
val fieldConverter = makeConverter(mt.valueType)
(parser: JsonParser) => parseJsonToken[Iterable[InternalRow]](parser, mt) {
case START_OBJECT => Some(InternalRow(convertMap(parser, fieldConverter)))
}
}
private def makeArrayRootConverter(at: ArrayType): JsonParser => Iterable[InternalRow] = {
val elemConverter = makeConverter(at.elementType)
(parser: JsonParser) => parseJsonToken[Iterable[InternalRow]](parser, at) {
case START_ARRAY => Some(InternalRow(convertArray(parser, elemConverter)))
case START_OBJECT if at.elementType.isInstanceOf[StructType] =>
// This handles the case when an input JSON object is a structure but
// the specified schema is an array of structures. In that case, the input JSON is
// considered as an array of only one element of struct type.
// This behavior was introduced by changes for SPARK-19595.
//
// For example, if the specified schema is ArrayType(new StructType().add("i", IntegerType))
// and JSON input as below:
//
// [{"i": 1}, {"i": 2}]
// [{"i": 3}]
// {"i": 4}
//
// The last row is considered as an array with one element, and result of conversion:
//
// Seq(Row(1), Row(2))
// Seq(Row(3))
// Seq(Row(4))
//
val st = at.elementType.asInstanceOf[StructType]
val fieldConverters = st.map(_.dataType).map(makeConverter).toArray
Some(InternalRow(new GenericArrayData(convertObject(parser, st, fieldConverters).toArray)))
}
}
private val decimalParser = ExprUtils.getDecimalParser(options.locale)
/**
* Create a converter which converts the JSON documents held by the `JsonParser`
* to a value according to a desired schema.
*/
def makeConverter(dataType: DataType): ValueConverter = dataType match {
case BooleanType =>
(parser: JsonParser) => parseJsonToken[java.lang.Boolean](parser, dataType) {
case VALUE_TRUE => true
case VALUE_FALSE => false
}
case ByteType =>
(parser: JsonParser) => parseJsonToken[java.lang.Byte](parser, dataType) {
case VALUE_NUMBER_INT => parser.getByteValue
}
case ShortType =>
(parser: JsonParser) => parseJsonToken[java.lang.Short](parser, dataType) {
case VALUE_NUMBER_INT => parser.getShortValue
}
case IntegerType =>
(parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) {
case VALUE_NUMBER_INT => parser.getIntValue
}
case LongType =>
(parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
case VALUE_NUMBER_INT => parser.getLongValue
}
case FloatType =>
(parser: JsonParser) => parseJsonToken[java.lang.Float](parser, dataType) {
case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
parser.getFloatValue
case VALUE_STRING if parser.getTextLength >= 1 =>
// Special case handling for NaN and Infinity.
parser.getText match {
case "NaN" if options.allowNonNumericNumbers =>
Float.NaN
case "+INF" | "+Infinity" | "Infinity" if options.allowNonNumericNumbers =>
Float.PositiveInfinity
case "-INF" | "-Infinity" if options.allowNonNumericNumbers =>
Float.NegativeInfinity
case _ => throw StringAsDataTypeException(parser.currentName, parser.getText,
FloatType)
}
}
case DoubleType =>
(parser: JsonParser) => parseJsonToken[java.lang.Double](parser, dataType) {
case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
parser.getDoubleValue
case VALUE_STRING if parser.getTextLength >= 1 =>
// Special case handling for NaN and Infinity.
parser.getText match {
case "NaN" if options.allowNonNumericNumbers =>
Double.NaN
case "+INF" | "+Infinity" | "Infinity" if options.allowNonNumericNumbers =>
Double.PositiveInfinity
case "-INF" | "-Infinity" if options.allowNonNumericNumbers =>
Double.NegativeInfinity
case _ => throw StringAsDataTypeException(parser.currentName, parser.getText,
DoubleType)
}
}
case StringType =>
(parser: JsonParser) => parseJsonToken[UTF8String](parser, dataType) {
case VALUE_STRING =>
UTF8String.fromString(parser.getText)
case _ =>
// Note that it always tries to convert the data as string without the case of failure.
val writer = new ByteArrayOutputStream()
Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) {
generator => generator.copyCurrentStructure(parser)
}
UTF8String.fromBytes(writer.toByteArray)
}
case TimestampType =>
(parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
case VALUE_STRING if parser.getTextLength >= 1 =>
try {
timestampFormatter.parse(parser.getText)
} catch {
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility if enabled.
if (!enableParsingFallbackForTimestampType) {
throw e
}
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText))
DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e)
}
case VALUE_NUMBER_INT =>
parser.getLongValue * 1000000L
}
case TimestampNTZType =>
(parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
case VALUE_STRING if parser.getTextLength >= 1 =>
timestampNTZFormatter.parseWithoutTimeZone(parser.getText, false)
}
case DateType =>
(parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) {
case VALUE_STRING if parser.getTextLength >= 1 =>
try {
dateFormatter.parse(parser.getText)
} catch {
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility if enabled.
if (!enableParsingFallbackForDateType) {
throw e
}
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText))
DateTimeUtils.stringToDate(str).getOrElse {
// In Spark 1.5.0, we store the data as number of days since epoch in string.
// So, we just convert it to Int.
try {
RebaseDateTime.rebaseJulianToGregorianDays(parser.getText.toInt)
} catch {
case _: NumberFormatException => throw e
}
}.asInstanceOf[Integer]
}
}
case BinaryType =>
(parser: JsonParser) => parseJsonToken[Array[Byte]](parser, dataType) {
case VALUE_STRING => parser.getBinaryValue
}
case dt: DecimalType =>
(parser: JsonParser) => parseJsonToken[Decimal](parser, dataType) {
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) =>
Decimal(parser.getDecimalValue, dt.precision, dt.scale)
case VALUE_STRING if parser.getTextLength >= 1 =>
val bigDecimal = decimalParser(parser.getText)
Decimal(bigDecimal, dt.precision, dt.scale)
}
case CalendarIntervalType => (parser: JsonParser) =>
parseJsonToken[CalendarInterval](parser, dataType) {
case VALUE_STRING =>
IntervalUtils.safeStringToInterval(UTF8String.fromString(parser.getText))
}
case ym: YearMonthIntervalType => (parser: JsonParser) =>
parseJsonToken[Integer](parser, dataType) {
case VALUE_STRING =>
val expr = Cast(Literal(parser.getText), ym)
Integer.valueOf(expr.eval(EmptyRow).asInstanceOf[Int])
}
case dt: DayTimeIntervalType => (parser: JsonParser) =>
parseJsonToken[java.lang.Long](parser, dataType) {
case VALUE_STRING =>
val expr = Cast(Literal(parser.getText), dt)
java.lang.Long.valueOf(expr.eval(EmptyRow).asInstanceOf[Long])
}
case st: StructType =>
val fieldConverters = st.map(_.dataType).map(makeConverter).toArray
(parser: JsonParser) => parseJsonToken[InternalRow](parser, dataType) {
case START_OBJECT => convertObject(parser, st, fieldConverters).get
}
case at: ArrayType =>
val elementConverter = makeConverter(at.elementType)
(parser: JsonParser) => parseJsonToken[ArrayData](parser, dataType) {
case START_ARRAY => convertArray(parser, elementConverter)
}
case mt: MapType =>
val valueConverter = makeConverter(mt.valueType)
(parser: JsonParser) => parseJsonToken[MapData](parser, dataType) {
case START_OBJECT => convertMap(parser, valueConverter)
}
case udt: UserDefinedType[_] =>
makeConverter(udt.sqlType)
case _: NullType =>
(parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
case _ => null
}
case _: VariantType => parseVariant
// We don't actually hit this exception though, we keep it for understandability
case _ => throw ExecutionErrors.unsupportedDataTypeError(dataType)
}
/**
* This method skips `FIELD_NAME`s at the beginning, and handles nulls ahead before trying
* to parse the JSON token using given function `f`. If the `f` failed to parse and convert the
* token, call `failedConversion` to handle the token.
*/
@scala.annotation.tailrec
private def parseJsonToken[R >: Null](
parser: JsonParser,
dataType: DataType)(f: PartialFunction[JsonToken, R]): R = {
parser.getCurrentToken match {
case FIELD_NAME =>
// There are useless FIELD_NAMEs between START_OBJECT and END_OBJECT tokens
parser.nextToken()
parseJsonToken[R](parser, dataType)(f)
case null | VALUE_NULL => null
case other => f.applyOrElse(other, failedConversion(parser, dataType))
}
}
private val allowEmptyString = SQLConf.get.getConf(SQLConf.LEGACY_ALLOW_EMPTY_STRING_IN_JSON)
/**
* This function throws an exception for failed conversion. For empty string on data types
* except for string and binary types, this also throws an exception.
*/
private def failedConversion[R >: Null](
parser: JsonParser,
dataType: DataType): PartialFunction[JsonToken, R] = {
// SPARK-25040: Disallows empty strings for data types except for string and binary types.
// But treats empty strings as null for certain types if the legacy config is enabled.
case VALUE_STRING if parser.getTextLength < 1 && allowEmptyString =>
dataType match {
case FloatType | DoubleType | TimestampType | DateType =>
throw EmptyJsonFieldValueException(dataType)
case _ => null
}
case VALUE_STRING if parser.getTextLength < 1 =>
throw EmptyJsonFieldValueException(dataType)
case token =>
// We cannot parse this token based on the given data type. So, we throw a
// RuntimeException and this exception will be caught by `parse` method.
throw CannotParseJSONFieldException(parser.currentName, parser.getText, token, dataType)
}
/**
* Parse an object from the token stream into a new Row representing the schema.
* Fields in the json that are not defined in the requested schema will be dropped.
*/
private def convertObject(
parser: JsonParser,
schema: StructType,
fieldConverters: Array[ValueConverter],
structFilters: StructFilters = new NoopFilters(),
isRoot: Boolean = false): Option[InternalRow] = {
val row = new GenericInternalRow(schema.length)
var badRecordException: Option[Throwable] = None
var skipRow = false
structFilters.reset()
lazy val bitmask = ResolveDefaultColumns.existenceDefaultsBitmask(schema)
resetExistenceDefaultsBitmask(schema, bitmask)
while (!skipRow && nextUntil(parser, JsonToken.END_OBJECT)) {
schema.getFieldIndex(parser.currentName) match {
case Some(index) =>
try {
row.update(index, fieldConverters(index).apply(parser))
skipRow = structFilters.skipRow(row, index)
bitmask(index) = false
} catch {
case e: SparkUpgradeException => throw e
case err: PartialValueException if enablePartialResults =>
badRecordException = badRecordException.orElse(Some(err.cause))
row.update(index, err.partialResult)
skipRow = structFilters.skipRow(row, index)
bitmask(index) = false
case NonFatal(e) if isRoot || enablePartialResults =>
badRecordException = badRecordException.orElse(Some(e))
parser.skipChildren()
}
case None =>
parser.skipChildren()
}
}
if (skipRow) {
None
} else if (badRecordException.isEmpty) {
applyExistenceDefaultValuesToRow(schema, row, bitmask)
Some(row)
} else {
throw PartialResultException(row, badRecordException.get)
}
}
/**
* Parse an object as a Map, preserving all fields.
*/
private def convertMap(
parser: JsonParser,
fieldConverter: ValueConverter): MapData = {
val keys = ArrayBuffer.empty[UTF8String]
val values = ArrayBuffer.empty[Any]
var badRecordException: Option[Throwable] = None
while (nextUntil(parser, JsonToken.END_OBJECT)) {
keys += UTF8String.fromString(parser.currentName)
try {
values += fieldConverter.apply(parser)
} catch {
case err: PartialValueException if enablePartialResults =>
badRecordException = badRecordException.orElse(Some(err.cause))
values += err.partialResult
case NonFatal(e) if enablePartialResults =>
badRecordException = badRecordException.orElse(Some(e))
parser.skipChildren()
}
}
// The JSON map will never have null or duplicated map keys, it's safe to create a
// ArrayBasedMapData directly here.
val mapData = ArrayBasedMapData(keys.toArray, values.toArray)
if (badRecordException.isEmpty) {
mapData
} else {
throw PartialMapDataResultException(mapData, badRecordException.get)
}
}
/**
* Parse an object as a Array.
*/
private def convertArray(
parser: JsonParser,
fieldConverter: ValueConverter,
isRoot: Boolean = false,
arrayAsStructs: Boolean = false): ArrayData = {
val values = ArrayBuffer.empty[Any]
var badRecordException: Option[Throwable] = None
while (nextUntil(parser, JsonToken.END_ARRAY)) {
try {
val v = fieldConverter.apply(parser)
if (isRoot && v == null) throw QueryExecutionErrors.rootConverterReturnNullError()
values += v
} catch {
case err: PartialValueException if enablePartialResults =>
badRecordException = badRecordException.orElse(Some(err.cause))
values += err.partialResult
}
}
val arrayData = new GenericArrayData(values.toArray)
if (badRecordException.isEmpty) {
arrayData
} else if (arrayAsStructs) {
throw PartialResultArrayException(arrayData.toArray[InternalRow](schema),
badRecordException.get)
} else {
throw PartialArrayDataResultException(arrayData, badRecordException.get)
}
}
/**
* Converts the non-stacktrace exceptions to user-friendly QueryExecutionErrors.
*/
private def convertCauseForPartialResult(err: Throwable): Throwable = err match {
case CannotParseJSONFieldException(fieldName, fieldValue, jsonType, dataType) =>
QueryExecutionErrors.cannotParseJSONFieldError(fieldName, fieldValue, jsonType, dataType)
case EmptyJsonFieldValueException(dataType) =>
QueryExecutionErrors.emptyJsonFieldValueError(dataType)
case _ => err
}
/**
* Parse the JSON input to the set of [[InternalRow]]s.
*
* @param recordLiteral an optional function that will be used to generate
* the corrupt record text instead of record.toString
*/
def parse[T](
record: T,
createParser: (JsonFactory, T) => JsonParser,
recordLiteral: T => UTF8String): Iterable[InternalRow] = {
try {
Utils.tryWithResource(createParser(factory, record)) { parser =>
// a null first token is equivalent to testing for input.trim.isEmpty
// but it works on any token stream and not just strings
parser.nextToken() match {
case null => None
case _ => rootConverter.apply(parser) match {
case null => throw QueryExecutionErrors.rootConverterReturnNullError()
case rows => rows.toSeq
}
}
}
} catch {
case e: SparkUpgradeException => throw e
case e @ (_: RuntimeException | _: JsonProcessingException | _: MalformedInputException) =>
// JSON parser currently doesn't support partial results for corrupted records.
// For such records, all fields other than the field configured by
// `columnNameOfCorruptRecord` are set to `null`.
throw BadRecordException(() => recordLiteral(record), cause = () => e)
case e: CharConversionException if options.encoding.isEmpty =>
val msg =
"""JSON parser cannot handle a character in its input.
|Specifying encoding as an input option explicitly might help to resolve the issue.
|""".stripMargin + e.getMessage
val wrappedCharException = new CharConversionException(msg)
wrappedCharException.initCause(e)
throw BadRecordException(() => recordLiteral(record), cause = () => wrappedCharException)
case PartialResultException(row, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
partialResults = () => Array(row),
cause = () => convertCauseForPartialResult(cause))
case PartialResultArrayException(rows, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
partialResults = () => rows,
cause = () => cause)
// These exceptions should never be thrown outside of JacksonParser.
// They are used for the control flow in the parser. We add them here for completeness
// since they also indicate a bad record.
case PartialArrayDataResultException(arrayData, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
partialResults = () => Array(InternalRow(arrayData)),
cause = () => convertCauseForPartialResult(cause))
case PartialMapDataResultException(mapData, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
partialResults = () => Array(InternalRow(mapData)),
cause = () => convertCauseForPartialResult(cause))
}
}
}