package org.apache.spark.sql.catalyst.expressions
import scala.collection.mutable.ArrayBuffer
import scala.util.parsing.combinator.RegexParsers
import com.fasterxml.jackson.core._
import com.fasterxml.jackson.core.json.JsonReadFeature
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, CodegenFallback, ExprCode}
import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper
import org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils
import org.apache.spark.sql.catalyst.json._
import org.apache.spark.sql.catalyst.trees.TreePattern.{JSON_TO_STRUCT, TreePattern}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.types.StringTypeAnyCollation
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{UTF8String, VariantVal}
import org.apache.spark.util.Utils
private[this] sealed trait PathInstruction
private[this] object PathInstruction {
private[expressions] case object Subscript extends PathInstruction
private[expressions] case object Wildcard extends PathInstruction
private[expressions] case object Key extends PathInstruction
private[expressions] case class Index(index: Long) extends PathInstruction
private[expressions] case class Named(name: String) extends PathInstruction
private[this] sealed trait WriteStyle
private[this] object WriteStyle {
private[expressions] case object RawStyle extends WriteStyle
private[expressions] case object QuotedStyle extends WriteStyle
private[expressions] case object FlattenStyle extends WriteStyle
private[this] object JsonPathParser extends RegexParsers {
import PathInstruction._
def root: Parser[Char] = '$'
def long: Parser[Long] = "\\d+".r ^? {
case x => x.toLong
// parse `[*]` and `[123]` subscripts
def subscript: Parser[List[PathInstruction]] =
for {
operand <- '[' ~> ('*' ^^^ Wildcard | long ^^ Index) <~ ']'
} yield {
Subscript :: operand :: Nil
// parse `.name` or `['name']` child expressions
def named: Parser[List[PathInstruction]] =
for {
name <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\']+".r <~ "']"
} yield {
Key :: Named(name) :: Nil
// child wildcards: `..`, `.*` or `['*']`
def wildcard: Parser[List[PathInstruction]] =
(".*" | "['*']") ^^^ List(Wildcard)
def node: Parser[List[PathInstruction]] =
wildcard |
named |
val expression: Parser[List[PathInstruction]] = {
phrase(root ~> rep(node) ^^ (x => x.flatten))
def parse(str: String): Option[List[PathInstruction]] = {
this.parseAll(expression, str) match {
case Success(result, _) =>
case _ =>
private[this] object SharedFactory {
val jsonFactory = new JsonFactoryBuilder()
// The two options below enabled for Hive compatibility
* Extracts json object from a json string based on json path specified, and returns json string
* of the extracted json object. It will return null if the input json string is invalid.
usage = "_FUNC_(json_txt, path) - Extracts a json object from `path`.",
examples = """
> SELECT _FUNC_('{"a":"b"}', '$.a');
group = "json_funcs",
since = "1.5.0")
case class GetJsonObject(json: Expression, path: Expression)
extends BinaryExpression with ExpectsInputTypes {
override def left: Expression = json
override def right: Expression = path
override def inputTypes: Seq[AbstractDataType] =
Seq(StringTypeAnyCollation, StringTypeAnyCollation)
override def dataType: DataType = SQLConf.get.defaultStringType
override def nullable: Boolean = true
override def prettyName: String = "get_json_object"
private lazy val evaluator = if (path.foldable) {
new GetJsonObjectEvaluator(path.eval().asInstanceOf[UTF8String])
} else {
new GetJsonObjectEvaluator()
override def eval(input: InternalRow): Any = {
if (!path.foldable) {
protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val evaluatorClass = classOf[GetJsonObjectEvaluator].getName
val initEvaluator = path.foldable match {
case true if path.eval() != null =>
val cachedPath = path.eval().asInstanceOf[UTF8String]
val refCachedPath = ctx.addReferenceObj("cachedPath", cachedPath)
s"new $evaluatorClass($refCachedPath)"
case _ => s"new $evaluatorClass()"
val evaluator = ctx.addMutableState(evaluatorClass, "evaluator",
v => s"""$v = $initEvaluator;""", forceInline = true)
val jsonEval = json.genCode(ctx)
val pathEval = path.genCode(ctx)
val setJson =
|if (${jsonEval.isNull}) {
| $evaluator.setJson(null);
|} else {
| $evaluator.setJson(${jsonEval.value});
val setPath = if (!path.foldable) {
|if (${pathEval.isNull}) {
| $evaluator.setPath(null);
|} else {
| $evaluator.setPath(${pathEval.value});
} else {
val resultType = CodeGenerator.boxedType(dataType)
val resultTerm = ctx.freshName("result")
ev.copy(code =
|$resultType $resultTerm = ($resultType) $evaluator.evaluate();
|boolean ${ev.isNull} = $resultTerm == null;
|${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
|if (!${ev.isNull}) {
| ${ev.value} = $resultTerm;
override protected def withNewChildrenInternal(
newLeft: Expression, newRight: Expression): GetJsonObject =
copy(json = newLeft, path = newRight)
class GetJsonObjectEvaluator(cachedPath: UTF8String) {
import com.fasterxml.jackson.core.JsonToken._
import PathInstruction._
import SharedFactory._
import WriteStyle._
def this() = this(null)
private lazy val parsedPath: Option[List[PathInstruction]] =
private var jsonStr: UTF8String = null
private var pathStr: UTF8String = null
def setJson(arg: UTF8String): Unit = {
jsonStr = arg
def setPath(arg: UTF8String): Unit = {
pathStr = arg
def evaluate(): Any = {
if (jsonStr == null) {
return null
val parsed = if (cachedPath != null) {
} else {
if (parsed.isDefined) {
try {
/* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having Jackson
detect character encoding which could fail for some malformed strings */
Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, jsonStr)) { parser =>
val output = new ByteArrayOutputStream()
val matched = Utils.tryWithResource(
jsonFactory.createGenerator(output, JsonEncoding.UTF8)) { generator =>
evaluatePath(parser, generator, RawStyle, parsed.get)
if (matched) {
} else {
} catch {
case _: JsonProcessingException => null
} else {
private def parsePath(path: UTF8String): Option[List[PathInstruction]] = {
if (path != null) {
} else {
// advance to the desired array index, assumes to start at the START_ARRAY token
private def arrayIndex(p: JsonParser, f: () => Boolean): Long => Boolean = {
case _ if p.getCurrentToken == END_ARRAY =>
// terminate, nothing has been written
case 0 =>
// we've reached the desired index
val dirty = f()
while (p.nextToken() != END_ARRAY) {
// advance the token stream to the end of the array
case i if i > 0 =>
// skip this token and evaluate the next
arrayIndex(p, f)(i - 1)
* Evaluate a list of JsonPath instructions, returning a bool that indicates if any leaf nodes
* have been written to the generator
private def evaluatePath(
p: JsonParser,
g: JsonGenerator,
style: WriteStyle,
path: List[PathInstruction]): Boolean = {
(p.getCurrentToken, path) match {
case (VALUE_STRING, Nil) if style == RawStyle =>
// there is no array wildcard or slice parent, emit this string without quotes
if (p.hasTextCharacters) {
g.writeRaw(p.getTextCharacters, p.getTextOffset, p.getTextLength)
} else {
case (START_ARRAY, Nil) if style == FlattenStyle =>
// flatten this array into the parent
var dirty = false
while (p.nextToken() != END_ARRAY) {
dirty |= evaluatePath(p, g, style, Nil)
case (_, Nil) =>
// general case: just copy the child tree verbatim
case (START_OBJECT, Key :: xs) =>
var dirty = false
while (p.nextToken() != END_OBJECT) {
if (dirty) {
// once a match has been found we can skip other fields
} else {
dirty = evaluatePath(p, g, style, xs)
case (START_ARRAY, Subscript :: Wildcard :: Subscript :: Wildcard :: xs) =>
// special handling for the non-structure preserving double wildcard behavior in Hive
var dirty = false
while (p.nextToken() != END_ARRAY) {
dirty |= evaluatePath(p, g, FlattenStyle, xs)
case (START_ARRAY, Subscript :: Wildcard :: xs) if style != QuotedStyle =>
// retain Flatten, otherwise use Quoted... cannot use Raw within an array
val nextStyle = style match {
case RawStyle => QuotedStyle
case FlattenStyle => FlattenStyle
case QuotedStyle => throw SparkException.internalError("Unexpected the quoted style.")
// temporarily buffer child matches, the emitted json will need to be
// modified slightly if there is only a single element written
val buffer = new StringWriter()
var dirty = 0
Utils.tryWithResource(jsonFactory.createGenerator(buffer)) { flattenGenerator =>
while (p.nextToken() != END_ARRAY) {
// track the number of array elements and only emit an outer array if
// we've written more than one element, this matches Hive's behavior
dirty += (if (evaluatePath(p, flattenGenerator, nextStyle, xs)) 1 else 0)
val buf = buffer.getBuffer
if (dirty > 1) {
} else if (dirty == 1) {
// remove outer array tokens
g.writeRawValue(buf.substring(1, buf.length() - 1))
} // else do not write anything
dirty > 0
case (START_ARRAY, Subscript :: Wildcard :: xs) =>
var dirty = false
while (p.nextToken() != END_ARRAY) {
// wildcards can have multiple matches, continually update the dirty count
dirty |= evaluatePath(p, g, QuotedStyle, xs)
case (START_ARRAY, Subscript :: Index(idx) :: (xs@Subscript :: Wildcard :: _)) =>
// we're going to have 1 or more results, switch to QuotedStyle
arrayIndex(p, () => evaluatePath(p, g, QuotedStyle, xs))(idx)
case (START_ARRAY, Subscript :: Index(idx) :: xs) =>
arrayIndex(p, () => evaluatePath(p, g, style, xs))(idx)
case (FIELD_NAME, Named(name) :: xs) if p.currentName == name =>
// exact field match
if (p.nextToken() != JsonToken.VALUE_NULL) {
evaluatePath(p, g, style, xs)
} else {
case (FIELD_NAME, Wildcard :: xs) =>
// wildcard field match
evaluatePath(p, g, style, xs)
case _ =>
// scalastyle:off line.size.limit
usage = "_FUNC_(jsonStr, p1, p2, ..., pn) - Returns a tuple like the function get_json_object, but it takes multiple names. All the input parameters and output column types are string.",
examples = """
> SELECT _FUNC_('{"a":1, "b":2}', 'a', 'b');
1 2
group = "json_funcs",
since = "1.6.0")
// scalastyle:on line.size.limit
case class JsonTuple(children: Seq[Expression])
extends Generator
with CodegenFallback
with QueryErrorsBase {
import SharedFactory._
override def nullable: Boolean = {
// a row is always returned
// if processing fails this shared value will be returned
@transient private lazy val nullRow: Seq[InternalRow] =
new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length)) :: Nil
// the json body is the first child
@transient private lazy val jsonExpr: Expression = children.head
// the fields to query are the remaining children
@transient private lazy val fieldExpressions: Seq[Expression] = children.tail
// eagerly evaluate any foldable the field names
@transient private lazy val foldableFieldNames: IndexedSeq[Option[String]] = { {
case expr if expr.foldable => Option(expr.eval()).map(_.asInstanceOf[UTF8String].toString)
case _ => null
// and count the number of foldable fields, we'll use this later to optimize evaluation
@transient private lazy val constantFields: Int = foldableFieldNames.count(_ != null)
override def elementSchema: StructType = StructType( {
case (_, idx) => StructField(s"c$idx", children.head.dataType, nullable = true)
override def prettyName: String = "json_tuple"
override def checkInputDataTypes(): TypeCheckResult = {
if (children.length < 2) {
throw QueryCompilationErrors.wrongNumArgsError(
toSQLId(prettyName), Seq("> 1"), children.length
} else if (children.forall(child => StringTypeAnyCollation.acceptsType(child.dataType))) {
} else {
errorSubClass = "NON_STRING_TYPE",
messageParameters = Map("funcName" -> toSQLId(prettyName)))
override def eval(input: InternalRow): IterableOnce[InternalRow] = {
val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
if (json == null) {
return nullRow
try {
/* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having Jackson
detect character encoding which could fail for some malformed strings */
Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) { parser =>
parseRow(parser, input)
} catch {
case _: JsonProcessingException =>
private def parseRow(parser: JsonParser, input: InternalRow): Seq[InternalRow] = {
// only objects are supported
if (parser.nextToken() != JsonToken.START_OBJECT) {
return nullRow
// evaluate the field names as String rather than UTF8String to
// optimize lookups from the json token, which is also a String
val fieldNames = if (constantFields == fieldExpressions.length) {
// typically the user will provide the field names as foldable expressions
// so we can use the cached copy
} else if (constantFields == 0) {
// none are foldable so all field names need to be evaluated from the input row { expr =>
} else {
// if there is a mix of constant and non-constant expressions
// prefer the cached copy when available {
case (null, expr) =>
case (fieldName, _) => fieldName.orNull
val row = Array.ofDim[Any](fieldNames.length)
// start reading through the token stream, looking for any requested field names
while (parser.nextToken() != JsonToken.END_OBJECT) {
if (parser.getCurrentToken == JsonToken.FIELD_NAME) {
// check to see if this field is desired in the output
val jsonField = parser.currentName
var idx = fieldNames.indexOf(jsonField)
if (idx >= 0) {
// it is, copy the child tree to the correct location in the output row
val output = new ByteArrayOutputStream()
// write the output directly to UTF8 encoded byte array
if (parser.nextToken() != JsonToken.VALUE_NULL) {
Utils.tryWithResource(jsonFactory.createGenerator(output, JsonEncoding.UTF8)) {
generator => copyCurrentStructure(generator, parser)
val jsonValue = UTF8String.fromBytes(output.toByteArray)
// SPARK-21804: json_tuple returns null values within repeated columns
// except the first one; so that we need to check the remaining fields.
do {
row(idx) = jsonValue
idx = fieldNames.indexOf(jsonField, idx + 1)
} while (idx >= 0)
// always skip children, it's cheap enough to do even if copyCurrentStructure was called
new GenericInternalRow(row) :: Nil
private def copyCurrentStructure(generator: JsonGenerator, parser: JsonParser): Unit = {
parser.getCurrentToken match {
// if the user requests a string field it needs to be returned without enclosing
// quotes which is accomplished via JsonGenerator.writeRaw instead of JsonGenerator.write
case JsonToken.VALUE_STRING if parser.hasTextCharacters =>
// slight optimization to avoid allocating a String instance, though the characters
// still have to be decoded... Jackson doesn't have a way to access the raw bytes
generator.writeRaw(parser.getTextCharacters, parser.getTextOffset, parser.getTextLength)
case JsonToken.VALUE_STRING =>
// the normal String case, pass it through to the output without enclosing quotes
case JsonToken.VALUE_NULL =>
// a special case that needs to be handled outside of this method.
// if a requested field is null, the result must be null. the easiest
// way to achieve this is just by ignoring null tokens entirely
throw SparkException.internalError("Do not attempt to copy a null field.")
case _ =>
// handle other types including objects, arrays, booleans and numbers
override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): JsonTuple =
copy(children = newChildren)
* Converts an json input string to a [[StructType]], [[ArrayType]] or [[MapType]]
* with the specified schema.
// scalastyle:off line.size.limit
usage = "_FUNC_(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`.",
examples = """
> SELECT _FUNC_('{"a":1, "b":0.8}', 'a INT, b DOUBLE');
> SELECT _FUNC_('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy'));
{"time":2015-08-26 00:00:00}
> SELECT _FUNC_('{"teacher": "Alice", "student": [{"name": "Bob", "rank": 1}, {"name": "Charlie", "rank": 2}]}', 'STRUCT<teacher: STRING, student: ARRAY<STRUCT<name: STRING, rank: INT>>>');
group = "json_funcs",
since = "2.2.0")
// scalastyle:on line.size.limit
case class JsonToStructs(
schema: DataType,
options: Map[String, String],
child: Expression,
timeZoneId: Option[String] = None)
extends UnaryExpression
with TimeZoneAwareExpression
with CodegenFallback
with ExpectsInputTypes
with NullIntolerant
with QueryErrorsBase {
// The JSON input data might be missing certain fields. We force the nullability
// of the user-provided schema to avoid data corruptions. In particular, the parquet-mr encoder
// can generate incorrect files if values are missing in columns declared as non-nullable.
val nullableSchema = schema.asNullable
override def nullable: Boolean = true
final override def nodePatternsInternal(): Seq[TreePattern] = Seq(JSON_TO_STRUCT)
// Used in `FunctionRegistry`
def this(child: Expression, schema: Expression, options: Map[String, String]) =
schema = ExprUtils.evalTypeExpr(schema),
options = options,
child = child,
timeZoneId = None)
def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String])
def this(child: Expression, schema: Expression, options: Expression) =
schema = ExprUtils.evalTypeExpr(schema),
options = ExprUtils.convertToMapData(options),
child = child,
timeZoneId = None)
override def checkInputDataTypes(): TypeCheckResult = nullableSchema match {
case _: StructType | _: ArrayType | _: MapType | _: VariantType =>
val checkResult = ExprUtils.checkJsonSchema(nullableSchema)
if (checkResult.isFailure) checkResult else super.checkInputDataTypes()
case _ =>
errorSubClass = "INVALID_JSON_SCHEMA",
messageParameters = Map("schema" -> toSQLType(nullableSchema)))
// This converts parsed rows to the desired output by the given schema.
lazy val converter = nullableSchema match {
case _: StructType =>
(rows: Iterator[InternalRow]) => if (rows.hasNext) else null
case _: ArrayType =>
(rows: Iterator[InternalRow]) => if (rows.hasNext) else null
case _: MapType =>
(rows: Iterator[InternalRow]) => if (rows.hasNext) else null
val nameOfCorruptRecord = SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
@transient lazy val parser = {
val parsedOptions = new JSONOptions(options, timeZoneId.get, nameOfCorruptRecord)
val mode = parsedOptions.parseMode
if (mode != PermissiveMode && mode != FailFastMode) {
throw QueryCompilationErrors.parseModeUnsupportedError("from_json", mode)
val (parserSchema, actualSchema) = nullableSchema match {
case s: StructType =>
ExprUtils.verifyColumnNameOfCorruptRecord(s, parsedOptions.columnNameOfCorruptRecord)
(s, StructType(s.filterNot( == parsedOptions.columnNameOfCorruptRecord)))
case other =>
(StructType(Array(StructField("value", other))), other)
val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = false)
val createParser = CreateJacksonParser.utf8String _
new FailureSafeParser[UTF8String](
input => rawParser.parse(input, createParser, identity[UTF8String]),
override def dataType: DataType = nullableSchema
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))
override def nullSafeEval(json: Any): Any = nullableSchema match {
case _: VariantType =>
case _ =>
override def inputTypes: Seq[AbstractDataType] = StringTypeAnyCollation :: Nil
override def sql: String = schema match {
case _: MapType => "entries"
case _ => super.sql
override def prettyName: String = "from_json"
override protected def withNewChildInternal(newChild: Expression): JsonToStructs =
copy(child = newChild)
* Converts a [[StructType]], [[ArrayType]] or [[MapType]] to a JSON output string.
// scalastyle:off line.size.limit
usage = "_FUNC_(expr[, options]) - Returns a JSON string with a given struct value",
examples = """
> SELECT _FUNC_(named_struct('a', 1, 'b', 2));
> SELECT _FUNC_(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'));
> SELECT _FUNC_(array(named_struct('a', 1, 'b', 2)));
> SELECT _FUNC_(map('a', named_struct('b', 1)));
> SELECT _FUNC_(map(named_struct('a', 1),named_struct('b', 2)));
> SELECT _FUNC_(map('a', 1));
> SELECT _FUNC_(array(map('a', 1)));
group = "json_funcs",
since = "2.2.0")
// scalastyle:on line.size.limit
case class StructsToJson(
options: Map[String, String],
child: Expression,
timeZoneId: Option[String] = None)
extends UnaryExpression
with TimeZoneAwareExpression
with CodegenFallback
with ExpectsInputTypes
with NullIntolerant
with QueryErrorsBase {
override def nullable: Boolean = true
def this(options: Map[String, String], child: Expression) = this(options, child, None)
// Used in `FunctionRegistry`
def this(child: Expression) = this(Map.empty, child, None)
def this(child: Expression, options: Expression) =
options = ExprUtils.convertToMapData(options),
child = child,
timeZoneId = None)
lazy val writer = new CharArrayWriter()
lazy val gen = new JacksonGenerator(
inputSchema, writer, new JSONOptions(options, timeZoneId.get))
lazy val inputSchema = child.dataType
// This converts rows to the JSON output according to the given schema.
lazy val converter: Any => UTF8String = {
def getAndReset(): UTF8String = {
val json = writer.toString
inputSchema match {
case _: StructType =>
(row: Any) =>
case _: ArrayType =>
(arr: Any) =>
case _: MapType =>
(map: Any) =>
case _: VariantType =>
(v: Any) =>
override def dataType: DataType = SQLConf.get.defaultStringType
override def checkInputDataTypes(): TypeCheckResult = inputSchema match {
case dt @ (_: StructType | _: MapType | _: ArrayType | _: VariantType) =>
JacksonUtils.verifyType(prettyName, dt)
case _ =>
errorSubClass = "INVALID_JSON_SCHEMA",
messageParameters = Map("schema" -> toSQLType(child.dataType)))
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))
override def nullSafeEval(value: Any): Any = converter(value)
override def inputTypes: Seq[AbstractDataType] = TypeCollection(ArrayType, StructType) :: Nil
override def prettyName: String = "to_json"
override protected def withNewChildInternal(newChild: Expression): StructsToJson =
copy(child = newChild)
* A function infers schema of JSON string.
usage = "_FUNC_(json[, options]) - Returns schema in the DDL format of JSON string.",
examples = """
> SELECT _FUNC_('[{"col":0}]');
> SELECT _FUNC_('[{"col":01}]', map('allowNumericLeadingZeros', 'true'));
group = "json_funcs",
since = "2.4.0")
case class SchemaOfJson(
child: Expression,
options: Map[String, String])
extends UnaryExpression with CodegenFallback with QueryErrorsBase {
def this(child: Expression) = this(child, Map.empty[String, String])
def this(child: Expression, options: Expression) = this(
child = child,
options = ExprUtils.convertToMapData(options))
override def dataType: DataType = SQLConf.get.defaultStringType
override def nullable: Boolean = false
private lazy val jsonOptions = new JSONOptions(options, "UTC")
private lazy val jsonFactory = jsonOptions.buildJsonFactory()
private lazy val jsonInferSchema = new JsonInferSchema(jsonOptions)
private lazy val json = child.eval().asInstanceOf[UTF8String]
override def checkInputDataTypes(): TypeCheckResult = {
if (child.foldable && json != null) {
} else if (!child.foldable) {
errorSubClass = "NON_FOLDABLE_INPUT",
messageParameters = Map(
"inputName" -> toSQLId("json"),
"inputType" -> toSQLType(child.dataType),
"inputExpr" -> toSQLExpr(child)))
} else {
errorSubClass = "UNEXPECTED_NULL",
messageParameters = Map("exprName" -> "json"))
override def eval(v: InternalRow): Any = {
val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) { parser =>
// To match with schema inference from JSON datasource.
jsonInferSchema.inferField(parser) match {
case st: StructType =>
jsonInferSchema.canonicalizeType(st, jsonOptions).getOrElse(StructType(Nil))
case at: ArrayType if at.elementType.isInstanceOf[StructType] =>
.canonicalizeType(at.elementType, jsonOptions)
.map(ArrayType(_, containsNull = at.containsNull))
.getOrElse(ArrayType(StructType(Nil), containsNull = at.containsNull))
case other: DataType =>
jsonInferSchema.canonicalizeType(other, jsonOptions).getOrElse(
override def prettyName: String = "schema_of_json"
override protected def withNewChildInternal(newChild: Expression): SchemaOfJson =
copy(child = newChild)
* A function that returns the number of elements in the outermost JSON array.
usage = "_FUNC_(jsonArray) - Returns the number of elements in the outermost JSON array.",
arguments = """
* jsonArray - A JSON array. `NULL` is returned in case of any other valid JSON string,
`NULL` or an invalid JSON.
examples = """
> SELECT _FUNC_('[1,2,3,4]');
> SELECT _FUNC_('[1,2,3,{"f1":1,"f2":[5,6]},4]');
> SELECT _FUNC_('[1,2');
group = "json_funcs",
since = "3.1.0"
case class LengthOfJsonArray(child: Expression) extends UnaryExpression
with CodegenFallback with ExpectsInputTypes {
override def inputTypes: Seq[AbstractDataType] = Seq(StringTypeAnyCollation)
override def dataType: DataType = IntegerType
override def nullable: Boolean = true
override def prettyName: String = "json_array_length"
override def eval(input: InternalRow): Any = {
val json = child.eval(input).asInstanceOf[UTF8String]
// return null for null input
if (json == null) {
return null
try {
Utils.tryWithResource(CreateJacksonParser.utf8String(SharedFactory.jsonFactory, json)) {
parser => {
// return null if null array is encountered.
if (parser.nextToken() == null) {
return null
// Parse the array to compute its length.
parseCounter(parser, input)
} catch {
case _: JsonProcessingException | _: IOException => null
private def parseCounter(parser: JsonParser, input: InternalRow): Any = {
var length = 0
// Only JSON array are supported for this function.
if (parser.currentToken != JsonToken.START_ARRAY) {
return null
// Keep traversing until the end of JSON array
while(parser.nextToken() != JsonToken.END_ARRAY) {
length += 1
// skip all the child of inner object or array
override protected def withNewChildInternal(newChild: Expression): LengthOfJsonArray =
copy(child = newChild)
* A function which returns all the keys of the outermost JSON object.
usage = "_FUNC_(json_object) - Returns all the keys of the outermost JSON object as an array.",
arguments = """
* json_object - A JSON object. If a valid JSON object is given, all the keys of the outermost
object will be returned as an array. If it is any other valid JSON string, an invalid JSON
string or an empty string, the function returns null.
examples = """
> SELECT _FUNC_('{}');
> SELECT _FUNC_('{"key": "value"}');
> SELECT _FUNC_('{"f1":"abc","f2":{"f3":"a", "f4":"b"}}');
group = "json_funcs",
since = "3.1.0"
case class JsonObjectKeys(child: Expression) extends UnaryExpression with CodegenFallback
with ExpectsInputTypes {
override def inputTypes: Seq[AbstractDataType] = Seq(StringTypeAnyCollation)
override def dataType: DataType = ArrayType(SQLConf.get.defaultStringType)
override def nullable: Boolean = true
override def prettyName: String = "json_object_keys"
override def eval(input: InternalRow): Any = {
val json = child.eval(input).asInstanceOf[UTF8String]
// return null for `NULL` input
if(json == null) {
return null
try {
Utils.tryWithResource(CreateJacksonParser.utf8String(SharedFactory.jsonFactory, json)) {
parser => {
// return null if an empty string or any other valid JSON string is encountered
if (parser.nextToken() == null || parser.currentToken() != JsonToken.START_OBJECT) {
return null
// Parse the JSON string to get all the keys of outermost JSON object
getJsonKeys(parser, input)
} catch {
case _: JsonProcessingException | _: IOException => null
private def getJsonKeys(parser: JsonParser, input: InternalRow): GenericArrayData = {
val arrayBufferOfKeys = ArrayBuffer.empty[UTF8String]
// traverse until the end of input and ensure it returns valid key
while(parser.nextValue() != null && parser.currentName() != null) {
// add current fieldName to the ArrayBuffer
arrayBufferOfKeys += UTF8String.fromString(parser.currentName)
// skip all the children of inner object or array
new GenericArrayData(arrayBufferOfKeys.toArray)
override protected def withNewChildInternal(newChild: Expression): JsonObjectKeys =
copy(child = newChild)