| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.sql.execution.datasources.json |
| |
| import java.io._ |
| import java.nio.charset.{Charset, StandardCharsets, UnsupportedCharsetException} |
| import java.nio.file.Files |
| import java.sql.{Date, Timestamp} |
| import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period, ZoneId} |
| import java.util.Locale |
| import java.util.concurrent.atomic.AtomicLong |
| |
| import com.fasterxml.jackson.core.JsonFactory |
| import org.apache.commons.lang3.exception.ExceptionUtils |
| import org.apache.hadoop.fs.{Path, PathFilter} |
| import org.apache.hadoop.io.SequenceFile.CompressionType |
| import org.apache.hadoop.io.compress.GzipCodec |
| |
| import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException, SparkUpgradeException, TestUtils} |
| import org.apache.spark.rdd.RDD |
| import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd} |
| import org.apache.spark.sql.{functions => F, _} |
| import org.apache.spark.sql.catalyst.json._ |
| import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, HadoopCompressionCodec} |
| import org.apache.spark.sql.catalyst.util.HadoopCompressionCodec.GZIP |
| import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLType |
| import org.apache.spark.sql.execution.ExternalRDD |
| import org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite, DataSource, InMemoryFileIndex, NoopCache} |
| import org.apache.spark.sql.execution.datasources.v2.json.JsonScanBuilder |
| import org.apache.spark.sql.internal.SQLConf |
| import org.apache.spark.sql.test.SharedSparkSession |
| import org.apache.spark.sql.types._ |
| import org.apache.spark.sql.types.StructType.fromDDL |
| import org.apache.spark.sql.types.TestUDT.{MyDenseVector, MyDenseVectorUDT} |
| import org.apache.spark.sql.util.CaseInsensitiveStringMap |
| import org.apache.spark.util.ArrayImplicits._ |
| import org.apache.spark.util.Utils |
| |
| class TestFileFilter extends PathFilter { |
| override def accept(path: Path): Boolean = path.getParent.getName != "p=2" |
| } |
| |
| abstract class JsonSuite |
| extends QueryTest |
| with SharedSparkSession |
| with TestJsonData |
| with CommonFileDataSourceSuite { |
| |
| import testImplicits._ |
| |
| override protected def dataSourceFormat = "json" |
| |
| override protected def sparkConf: SparkConf = |
| super.sparkConf.set(SQLConf.ANSI_ENABLED.key, "false") |
| |
| test("Type promotion") { |
| def checkTypePromotion(expected: Any, actual: Any): Unit = { |
| assert(expected.getClass == actual.getClass, |
| s"Failed to promote ${actual.getClass} to ${expected.getClass}.") |
| assert(expected == actual, |
| s"Promoted value ${actual}(${actual.getClass}) does not equal the expected value " + |
| s"${expected}(${expected.getClass}).") |
| } |
| |
| val factory = new JsonFactory() |
| def enforceCorrectType( |
| value: Any, |
| dataType: DataType, |
| options: Map[String, String] = Map.empty): Any = { |
| val writer = new StringWriter() |
| Utils.tryWithResource(factory.createGenerator(writer)) { generator => |
| generator.writeObject(value) |
| generator.flush() |
| } |
| |
| val dummyOption = new JSONOptions(options, SQLConf.get.sessionLocalTimeZone) |
| val dummySchema = StructType(Seq.empty) |
| val parser = new JacksonParser(dummySchema, dummyOption, allowArrayAsStructs = true) |
| |
| Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser => |
| jsonParser.nextToken() |
| val converter = parser.makeConverter(dataType) |
| converter.apply(jsonParser) |
| } |
| } |
| |
| val intNumber: Int = 2147483647 |
| checkTypePromotion(intNumber, enforceCorrectType(intNumber, IntegerType)) |
| checkTypePromotion(intNumber.toLong, enforceCorrectType(intNumber, LongType)) |
| checkTypePromotion(intNumber.toDouble, enforceCorrectType(intNumber, DoubleType)) |
| checkTypePromotion( |
| Decimal(intNumber), enforceCorrectType(intNumber, DecimalType.SYSTEM_DEFAULT)) |
| |
| val longNumber: Long = 9223372036854775807L |
| checkTypePromotion(longNumber, enforceCorrectType(longNumber, LongType)) |
| checkTypePromotion(longNumber.toDouble, enforceCorrectType(longNumber, DoubleType)) |
| checkTypePromotion( |
| Decimal(longNumber), enforceCorrectType(longNumber, DecimalType.SYSTEM_DEFAULT)) |
| |
| val doubleNumber: Double = 1.7976931348623157d |
| checkTypePromotion(doubleNumber.toDouble, enforceCorrectType(doubleNumber, DoubleType)) |
| |
| checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber * 1000L)), |
| enforceCorrectType(intNumber, TimestampType)) |
| checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong * 1000L)), |
| enforceCorrectType(intNumber.toLong, TimestampType)) |
| val strTime = "2014-09-30 12:34:56" |
| checkTypePromotion( |
| expected = DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)), |
| enforceCorrectType(strTime, TimestampType, |
| Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss"))) |
| |
| val strDate = "2014-10-15" |
| checkTypePromotion( |
| DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType)) |
| |
| val ISO8601Time1 = "1970-01-01T01:00:01.0Z" |
| checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)), |
| enforceCorrectType( |
| ISO8601Time1, |
| TimestampType, |
| Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SX"))) |
| val ISO8601Time2 = "1970-01-01T02:00:01-01:00" |
| checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(10801000)), |
| enforceCorrectType( |
| ISO8601Time2, |
| TimestampType, |
| Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ssXXX"))) |
| |
| val ISO8601Date = "1970-01-01" |
| checkTypePromotion(DateTimeUtils.microsToDays(32400000000L, ZoneId.systemDefault), |
| enforceCorrectType(ISO8601Date, DateType)) |
| } |
| |
| test("Get compatible type") { |
| def checkDataType(t1: DataType, t2: DataType, expected: DataType): Unit = { |
| var actual = JsonInferSchema.compatibleType(t1, t2) |
| assert(actual == expected, |
| s"Expected $expected as the most general data type for $t1 and $t2, found $actual") |
| actual = JsonInferSchema.compatibleType(t2, t1) |
| assert(actual == expected, |
| s"Expected $expected as the most general data type for $t1 and $t2, found $actual") |
| } |
| |
| // NullType |
| checkDataType(NullType, BooleanType, BooleanType) |
| checkDataType(NullType, IntegerType, IntegerType) |
| checkDataType(NullType, LongType, LongType) |
| checkDataType(NullType, DoubleType, DoubleType) |
| checkDataType(NullType, DecimalType.SYSTEM_DEFAULT, DecimalType.SYSTEM_DEFAULT) |
| checkDataType(NullType, StringType, StringType) |
| checkDataType(NullType, ArrayType(IntegerType), ArrayType(IntegerType)) |
| checkDataType(NullType, StructType(Nil), StructType(Nil)) |
| checkDataType(NullType, NullType, NullType) |
| |
| // BooleanType |
| checkDataType(BooleanType, BooleanType, BooleanType) |
| checkDataType(BooleanType, IntegerType, StringType) |
| checkDataType(BooleanType, LongType, StringType) |
| checkDataType(BooleanType, DoubleType, StringType) |
| checkDataType(BooleanType, DecimalType.SYSTEM_DEFAULT, StringType) |
| checkDataType(BooleanType, StringType, StringType) |
| checkDataType(BooleanType, ArrayType(IntegerType), StringType) |
| checkDataType(BooleanType, StructType(Nil), StringType) |
| |
| // IntegerType |
| checkDataType(IntegerType, IntegerType, IntegerType) |
| checkDataType(IntegerType, LongType, LongType) |
| checkDataType(IntegerType, DoubleType, DoubleType) |
| checkDataType(IntegerType, DecimalType.SYSTEM_DEFAULT, DecimalType.SYSTEM_DEFAULT) |
| checkDataType(IntegerType, StringType, StringType) |
| checkDataType(IntegerType, ArrayType(IntegerType), StringType) |
| checkDataType(IntegerType, StructType(Nil), StringType) |
| |
| // LongType |
| checkDataType(LongType, LongType, LongType) |
| checkDataType(LongType, DoubleType, DoubleType) |
| checkDataType(LongType, DecimalType.SYSTEM_DEFAULT, DecimalType.SYSTEM_DEFAULT) |
| checkDataType(LongType, StringType, StringType) |
| checkDataType(LongType, ArrayType(IntegerType), StringType) |
| checkDataType(LongType, StructType(Nil), StringType) |
| |
| // DoubleType |
| checkDataType(DoubleType, DoubleType, DoubleType) |
| checkDataType(DoubleType, DecimalType.SYSTEM_DEFAULT, DoubleType) |
| checkDataType(DoubleType, StringType, StringType) |
| checkDataType(DoubleType, ArrayType(IntegerType), StringType) |
| checkDataType(DoubleType, StructType(Nil), StringType) |
| |
| // DecimalType |
| checkDataType(DecimalType.SYSTEM_DEFAULT, DecimalType.SYSTEM_DEFAULT, |
| DecimalType.SYSTEM_DEFAULT) |
| checkDataType(DecimalType.SYSTEM_DEFAULT, StringType, StringType) |
| checkDataType(DecimalType.SYSTEM_DEFAULT, ArrayType(IntegerType), StringType) |
| checkDataType(DecimalType.SYSTEM_DEFAULT, StructType(Nil), StringType) |
| |
| // StringType |
| checkDataType(StringType, StringType, StringType) |
| checkDataType(StringType, ArrayType(IntegerType), StringType) |
| checkDataType(StringType, StructType(Nil), StringType) |
| |
| // ArrayType |
| checkDataType(ArrayType(IntegerType), ArrayType(IntegerType), ArrayType(IntegerType)) |
| checkDataType(ArrayType(IntegerType), ArrayType(LongType), ArrayType(LongType)) |
| checkDataType(ArrayType(IntegerType), ArrayType(StringType), ArrayType(StringType)) |
| checkDataType(ArrayType(IntegerType), StructType(Nil), StringType) |
| checkDataType( |
| ArrayType(IntegerType, true), ArrayType(IntegerType), ArrayType(IntegerType, true)) |
| checkDataType( |
| ArrayType(IntegerType, true), ArrayType(IntegerType, false), ArrayType(IntegerType, true)) |
| checkDataType( |
| ArrayType(IntegerType, true), ArrayType(IntegerType, true), ArrayType(IntegerType, true)) |
| checkDataType( |
| ArrayType(IntegerType, false), ArrayType(IntegerType), ArrayType(IntegerType, true)) |
| checkDataType( |
| ArrayType(IntegerType, false), ArrayType(IntegerType, false), ArrayType(IntegerType, false)) |
| checkDataType( |
| ArrayType(IntegerType, false), ArrayType(IntegerType, true), ArrayType(IntegerType, true)) |
| |
| // StructType |
| checkDataType(StructType(Nil), StructType(Nil), StructType(Nil)) |
| checkDataType( |
| StructType(StructField("f1", IntegerType, true) :: Nil), |
| StructType(StructField("f1", IntegerType, true) :: Nil), |
| StructType(StructField("f1", IntegerType, true) :: Nil)) |
| checkDataType( |
| StructType(StructField("f1", IntegerType, true) :: Nil), |
| StructType(Nil), |
| StructType(StructField("f1", IntegerType, true) :: Nil)) |
| checkDataType( |
| StructType( |
| StructField("f1", IntegerType, true) :: |
| StructField("f2", IntegerType, true) :: Nil), |
| StructType(StructField("f1", LongType, true) :: Nil), |
| StructType( |
| StructField("f1", LongType, true) :: |
| StructField("f2", IntegerType, true) :: Nil)) |
| checkDataType( |
| StructType( |
| StructField("f1", IntegerType, true) :: Nil), |
| StructType( |
| StructField("f2", IntegerType, true) :: Nil), |
| StructType( |
| StructField("f1", IntegerType, true) :: |
| StructField("f2", IntegerType, true) :: Nil)) |
| checkDataType( |
| StructType( |
| StructField("f1", IntegerType, true) :: Nil), |
| DecimalType.SYSTEM_DEFAULT, |
| StringType) |
| } |
| |
| test("Complex field and type inferring with null in sampling") { |
| withTempView("jsonTable") { |
| val jsonDF = spark.read.json(jsonNullStruct) |
| val expectedSchema = StructType( |
| StructField("headers", StructType( |
| StructField("Charset", StringType, true) :: |
| StructField("Host", StringType, true) :: Nil) |
| , true) :: |
| StructField("ip", StringType, true) :: |
| StructField("nullstr", StringType, true):: Nil) |
| |
| assert(expectedSchema === jsonDF.schema) |
| jsonDF.createOrReplaceTempView("jsonTable") |
| |
| checkAnswer( |
| sql("select nullstr, headers.Host from jsonTable"), |
| Seq(Row("", "1.abc.com"), Row("", null), Row("", null), Row(null, null)) |
| ) |
| } |
| } |
| |
| test("Primitive field and type inferring") { |
| withTempView("jsonTable") { |
| val jsonDF = spark.read.json(primitiveFieldAndType) |
| |
| val expectedSchema = StructType( |
| StructField("bigInteger", DecimalType(20, 0), true) :: |
| StructField("boolean", BooleanType, true) :: |
| StructField("double", DoubleType, true) :: |
| StructField("integer", LongType, true) :: |
| StructField("long", LongType, true) :: |
| StructField("null", StringType, true) :: |
| StructField("string", StringType, true) :: Nil) |
| |
| assert(expectedSchema === jsonDF.schema) |
| |
| jsonDF.createOrReplaceTempView("jsonTable") |
| |
| checkAnswer( |
| sql("select * from jsonTable"), |
| Row(new java.math.BigDecimal("92233720368547758070"), |
| true, |
| 1.7976931348623157, |
| 10, |
| 21474836470L, |
| null, |
| "this is a simple string.") |
| ) |
| } |
| } |
| |
| test("Complex field and type inferring") { |
| withTempView("jsonTable") { |
| val jsonDF = spark.read.json(complexFieldAndType1) |
| |
| val expectedSchema = StructType( |
| StructField("arrayOfArray1", ArrayType(ArrayType(StringType, true), true), true) :: |
| StructField("arrayOfArray2", ArrayType(ArrayType(DoubleType, true), true), true) :: |
| StructField("arrayOfBigInteger", ArrayType(DecimalType(21, 0), true), true) :: |
| StructField("arrayOfBoolean", ArrayType(BooleanType, true), true) :: |
| StructField("arrayOfDouble", ArrayType(DoubleType, true), true) :: |
| StructField("arrayOfInteger", ArrayType(LongType, true), true) :: |
| StructField("arrayOfLong", ArrayType(LongType, true), true) :: |
| StructField("arrayOfNull", ArrayType(StringType, true), true) :: |
| StructField("arrayOfString", ArrayType(StringType, true), true) :: |
| StructField("arrayOfStruct", ArrayType( |
| StructType( |
| StructField("field1", BooleanType, true) :: |
| StructField("field2", StringType, true) :: |
| StructField("field3", StringType, true) :: Nil), true), true) :: |
| StructField("struct", StructType( |
| StructField("field1", BooleanType, true) :: |
| StructField("field2", DecimalType(20, 0), true) :: Nil), true) :: |
| StructField("structWithArrayFields", StructType( |
| StructField("field1", ArrayType(LongType, true), true) :: |
| StructField("field2", ArrayType(StringType, true), true) :: Nil), true) :: Nil) |
| |
| assert(expectedSchema === jsonDF.schema) |
| |
| jsonDF.createOrReplaceTempView("jsonTable") |
| |
| // Access elements of a primitive array. |
| checkAnswer( |
| sql("select arrayOfString[0], arrayOfString[1], arrayOfString[2] from jsonTable"), |
| Row("str1", "str2", null) |
| ) |
| |
| // Access an array of null values. |
| checkAnswer( |
| sql("select arrayOfNull from jsonTable"), |
| Row(Seq(null, null, null, null)) |
| ) |
| |
| // Access elements of a BigInteger array (we use DecimalType internally). |
| checkAnswer( |
| sql("select arrayOfBigInteger[0], arrayOfBigInteger[1], arrayOfBigInteger[2] from " + |
| "jsonTable"), |
| Row(new java.math.BigDecimal("922337203685477580700"), |
| new java.math.BigDecimal("-922337203685477580800"), null) |
| ) |
| |
| // Access elements of an array of arrays. |
| checkAnswer( |
| sql("select arrayOfArray1[0], arrayOfArray1[1] from jsonTable"), |
| Row(Seq("1", "2", "3"), Seq("str1", "str2")) |
| ) |
| |
| // Access elements of an array of arrays. |
| checkAnswer( |
| sql("select arrayOfArray2[0], arrayOfArray2[1] from jsonTable"), |
| Row(Seq(1.0, 2.0, 3.0), Seq(1.1, 2.1, 3.1)) |
| ) |
| |
| // Access elements of an array inside a filed with the type of ArrayType(ArrayType). |
| checkAnswer( |
| sql("select arrayOfArray1[1][1], arrayOfArray2[1][1] from jsonTable"), |
| Row("str2", 2.1) |
| ) |
| |
| // Access elements of an array of structs. |
| checkAnswer( |
| sql("select arrayOfStruct[0], arrayOfStruct[1], arrayOfStruct[2], arrayOfStruct[3] " + |
| "from jsonTable"), |
| Row( |
| Row(true, "str1", null), |
| Row(false, null, null), |
| Row(null, null, null), |
| null) |
| ) |
| |
| // Access a struct and fields inside of it. |
| checkAnswer( |
| sql("select struct, struct.field1, struct.field2 from jsonTable"), |
| Row( |
| Row(true, new java.math.BigDecimal("92233720368547758070")), |
| true, |
| new java.math.BigDecimal("92233720368547758070")) :: Nil |
| ) |
| |
| // Access an array field of a struct. |
| checkAnswer( |
| sql("select structWithArrayFields.field1, structWithArrayFields.field2 from jsonTable"), |
| Row(Seq(4, 5, 6), Seq("str1", "str2")) |
| ) |
| |
| // Access elements of an array field of a struct. |
| checkAnswer( |
| sql("select structWithArrayFields.field1[1], structWithArrayFields.field2[3] from " + |
| "jsonTable"), |
| Row(5, null) |
| ) |
| } |
| } |
| |
| test("GetField operation on complex data type") { |
| withTempView("jsonTable") { |
| val jsonDF = spark.read.json(complexFieldAndType1) |
| jsonDF.createOrReplaceTempView("jsonTable") |
| |
| checkAnswer( |
| sql("select arrayOfStruct[0].field1, arrayOfStruct[0].field2 from jsonTable"), |
| Row(true, "str1") |
| ) |
| |
| // Getting all values of a specific field from an array of structs. |
| checkAnswer( |
| sql("select arrayOfStruct.field1, arrayOfStruct.field2 from jsonTable"), |
| Row(Seq(true, false, null), Seq("str1", null, null)) |
| ) |
| } |
| } |
| |
| test("Type conflict in primitive field values") { |
| withTempView("jsonTable") { |
| val jsonDF = spark.read.json(primitiveFieldValueTypeConflict) |
| |
| val expectedSchema = StructType( |
| StructField("num_bool", StringType, true) :: |
| StructField("num_num_1", LongType, true) :: |
| StructField("num_num_2", DoubleType, true) :: |
| StructField("num_num_3", DoubleType, true) :: |
| StructField("num_str", StringType, true) :: |
| StructField("str_bool", StringType, true) :: Nil) |
| |
| assert(expectedSchema === jsonDF.schema) |
| |
| jsonDF.createOrReplaceTempView("jsonTable") |
| |
| checkAnswer( |
| sql("select * from jsonTable"), |
| Row("true", 11L, null, 1.1, "13.1", "str1") :: |
| Row("12", null, 21474836470.9, null, null, "true") :: |
| Row("false", 21474836470L, 92233720368547758070d, 100, "str1", "false") :: |
| Row(null, 21474836570L, 1.1, 21474836470L, "92233720368547758070", null) :: Nil |
| ) |
| |
| // Widening to LongType |
| checkAnswer( |
| sql("select num_num_1 - 100 from jsonTable where num_num_1 > 11"), |
| Row(21474836370L) :: Row(21474836470L) :: Nil |
| ) |
| |
| checkAnswer( |
| sql("select num_num_1 - 100 from jsonTable where num_num_1 > 10"), |
| Row(-89) :: Row(21474836370L) :: Row(21474836470L) :: Nil |
| ) |
| |
| // Widening to DecimalType |
| checkAnswer( |
| sql("select num_num_2 + 1.3 from jsonTable where num_num_2 > 1.1"), |
| Row(21474836472.2) :: |
| Row(92233720368547758071.3) :: Nil |
| ) |
| |
| // Widening to Double |
| checkAnswer( |
| sql("select num_num_3 + 1.2 from jsonTable where num_num_3 > 1.1"), |
| Row(101.2) :: Row(21474836471.2) :: Nil |
| ) |
| |
| // The following tests are about type coercion instead of JSON data source. |
| // Here we simply forcus on the behavior of non-Ansi. |
| if(!SQLConf.get.ansiEnabled) { |
| // Number and Boolean conflict: resolve the type as number in this query. |
| checkAnswer( |
| sql("select num_bool - 10 from jsonTable where num_bool > 11"), |
| Row(2) |
| ) |
| |
| // Number and String conflict: resolve the type as number in this query. |
| checkAnswer( |
| sql("select num_str + 1.2 from jsonTable where num_str > 14d"), |
| Row(92233720368547758071.2) |
| ) |
| |
| // Number and String conflict: resolve the type as number in this query. |
| checkAnswer( |
| sql("select num_str + 1.2 from jsonTable where num_str >= 92233720368547758060"), |
| Row(new java.math.BigDecimal("92233720368547758071.2").doubleValue) |
| ) |
| } |
| |
| // String and Boolean conflict: resolve the type as string. |
| checkAnswer( |
| sql("select * from jsonTable where str_bool = 'str1'"), |
| Row("true", 11L, null, 1.1, "13.1", "str1") |
| ) |
| } |
| } |
| |
| test("Type conflict in complex field values") { |
| withTempView("jsonTable") { |
| val jsonDF = spark.read.json(complexFieldValueTypeConflict) |
| |
| val expectedSchema = StructType( |
| StructField("array", ArrayType(LongType, true), true) :: |
| StructField("num_struct", StringType, true) :: |
| StructField("str_array", StringType, true) :: |
| StructField("struct", StructType( |
| StructField("field", StringType, true) :: Nil), true) :: |
| StructField("struct_array", StringType, true) :: Nil) |
| |
| assert(expectedSchema === jsonDF.schema) |
| |
| jsonDF.createOrReplaceTempView("jsonTable") |
| |
| checkAnswer( |
| sql("select * from jsonTable"), |
| Row(Seq(), "11", "[1,2,3]", Row(null), "[]") :: |
| Row(null, """{"field":false}""", null, null, "{}") :: |
| Row(Seq(4, 5, 6), null, "str", Row(null), "[7,8,9]") :: |
| Row(Seq(7), "{}", """["str1","str2",33]""", Row("str"), """{"field":true}""") :: Nil |
| ) |
| } |
| } |
| |
| test("Type conflict in array elements") { |
| withTempView("jsonTable") { |
| val jsonDF = spark.read.json(arrayElementTypeConflict) |
| |
| val expectedSchema = StructType( |
| StructField("array1", ArrayType(StringType, true), true) :: |
| StructField("array2", ArrayType(StructType( |
| StructField("field", LongType, true) :: Nil), true), true) :: |
| StructField("array3", ArrayType(StringType, true), true) :: Nil) |
| |
| assert(expectedSchema === jsonDF.schema) |
| |
| jsonDF.createOrReplaceTempView("jsonTable") |
| |
| checkAnswer( |
| sql("select * from jsonTable"), |
| Row(Seq("1", "1.1", "true", null, "[]", "{}", "[2,3,4]", |
| """{"field":"str"}"""), Seq(Row(214748364700L), Row(1)), null) :: |
| Row(null, null, Seq("""{"field":"str"}""", """{"field":1}""")) :: |
| Row(null, null, Seq("1", "2", "3")) :: Nil |
| ) |
| |
| // Treat an element as a number. |
| checkAnswer( |
| sql("select array1[0] + 1 from jsonTable where array1 is not null"), |
| Row(2) |
| ) |
| } |
| } |
| |
| test("Handling missing fields") { |
| withTempView("jsonTable") { |
| val jsonDF = spark.read.json(missingFields) |
| |
| val expectedSchema = StructType( |
| StructField("a", BooleanType, true) :: |
| StructField("b", LongType, true) :: |
| StructField("c", ArrayType(LongType, true), true) :: |
| StructField("d", StructType( |
| StructField("field", BooleanType, true) :: Nil), true) :: |
| StructField("e", StringType, true) :: Nil) |
| |
| assert(expectedSchema === jsonDF.schema) |
| |
| jsonDF.createOrReplaceTempView("jsonTable") |
| } |
| } |
| |
| test("Loading a JSON dataset from a text file") { |
| withTempView("jsonTable") { |
| val dir = Utils.createTempDir() |
| dir.delete() |
| val path = dir.getCanonicalPath |
| primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path) |
| val jsonDF = spark.read.json(path) |
| |
| val expectedSchema = StructType( |
| StructField("bigInteger", DecimalType(20, 0), true) :: |
| StructField("boolean", BooleanType, true) :: |
| StructField("double", DoubleType, true) :: |
| StructField("integer", LongType, true) :: |
| StructField("long", LongType, true) :: |
| StructField("null", StringType, true) :: |
| StructField("string", StringType, true) :: Nil) |
| |
| assert(expectedSchema === jsonDF.schema) |
| |
| jsonDF.createOrReplaceTempView("jsonTable") |
| |
| checkAnswer( |
| sql("select * from jsonTable"), |
| Row(new java.math.BigDecimal("92233720368547758070"), |
| true, |
| 1.7976931348623157, |
| 10, |
| 21474836470L, |
| null, |
| "this is a simple string.") |
| ) |
| } |
| } |
| |
| test("Loading a JSON dataset primitivesAsString returns schema with primitive types as strings") { |
| withTempView("jsonTable") { |
| val dir = Utils.createTempDir() |
| dir.delete() |
| val path = dir.getCanonicalPath |
| primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path) |
| val jsonDF = spark.read.option("primitivesAsString", "true").json(path) |
| |
| val expectedSchema = StructType( |
| StructField("bigInteger", StringType, true) :: |
| StructField("boolean", StringType, true) :: |
| StructField("double", StringType, true) :: |
| StructField("integer", StringType, true) :: |
| StructField("long", StringType, true) :: |
| StructField("null", StringType, true) :: |
| StructField("string", StringType, true) :: Nil) |
| |
| assert(expectedSchema === jsonDF.schema) |
| |
| jsonDF.createOrReplaceTempView("jsonTable") |
| |
| checkAnswer( |
| sql("select * from jsonTable"), |
| Row("92233720368547758070", |
| "true", |
| "1.7976931348623157", |
| "10", |
| "21474836470", |
| null, |
| "this is a simple string.") |
| ) |
| } |
| } |
| |
| test("Loading a JSON dataset primitivesAsString returns complex fields as strings") { |
| withTempView("jsonTable") { |
| val jsonDF = spark.read.option("primitivesAsString", "true").json(complexFieldAndType1) |
| |
| val expectedSchema = StructType( |
| StructField("arrayOfArray1", ArrayType(ArrayType(StringType, true), true), true) :: |
| StructField("arrayOfArray2", ArrayType(ArrayType(StringType, true), true), true) :: |
| StructField("arrayOfBigInteger", ArrayType(StringType, true), true) :: |
| StructField("arrayOfBoolean", ArrayType(StringType, true), true) :: |
| StructField("arrayOfDouble", ArrayType(StringType, true), true) :: |
| StructField("arrayOfInteger", ArrayType(StringType, true), true) :: |
| StructField("arrayOfLong", ArrayType(StringType, true), true) :: |
| StructField("arrayOfNull", ArrayType(StringType, true), true) :: |
| StructField("arrayOfString", ArrayType(StringType, true), true) :: |
| StructField("arrayOfStruct", ArrayType( |
| StructType( |
| StructField("field1", StringType, true) :: |
| StructField("field2", StringType, true) :: |
| StructField("field3", StringType, true) :: Nil), true), true) :: |
| StructField("struct", StructType( |
| StructField("field1", StringType, true) :: |
| StructField("field2", StringType, true) :: Nil), true) :: |
| StructField("structWithArrayFields", StructType( |
| StructField("field1", ArrayType(StringType, true), true) :: |
| StructField("field2", ArrayType(StringType, true), true) :: Nil), true) :: Nil) |
| |
| assert(expectedSchema === jsonDF.schema) |
| |
| jsonDF.createOrReplaceTempView("jsonTable") |
| |
| // Access elements of a primitive array. |
| checkAnswer( |
| sql("select arrayOfString[0], arrayOfString[1], arrayOfString[2] from jsonTable"), |
| Row("str1", "str2", null) |
| ) |
| |
| // Access an array of null values. |
| checkAnswer( |
| sql("select arrayOfNull from jsonTable"), |
| Row(Seq(null, null, null, null)) |
| ) |
| |
| // Access elements of a BigInteger array (we use DecimalType internally). |
| checkAnswer( |
| sql("select arrayOfBigInteger[0], arrayOfBigInteger[1], arrayOfBigInteger[2] from " + |
| "jsonTable"), |
| Row("922337203685477580700", "-922337203685477580800", null) |
| ) |
| |
| // Access elements of an array of arrays. |
| checkAnswer( |
| sql("select arrayOfArray1[0], arrayOfArray1[1] from jsonTable"), |
| Row(Seq("1", "2", "3"), Seq("str1", "str2")) |
| ) |
| |
| // Access elements of an array of arrays. |
| checkAnswer( |
| sql("select arrayOfArray2[0], arrayOfArray2[1] from jsonTable"), |
| Row(Seq("1", "2", "3"), Seq("1.1", "2.1", "3.1")) |
| ) |
| |
| // Access elements of an array inside a filed with the type of ArrayType(ArrayType). |
| checkAnswer( |
| sql("select arrayOfArray1[1][1], arrayOfArray2[1][1] from jsonTable"), |
| Row("str2", "2.1") |
| ) |
| |
| // Access elements of an array of structs. |
| checkAnswer( |
| sql("select arrayOfStruct[0], arrayOfStruct[1], arrayOfStruct[2], arrayOfStruct[3] " + |
| "from jsonTable"), |
| Row( |
| Row("true", "str1", null), |
| Row("false", null, null), |
| Row(null, null, null), |
| null) |
| ) |
| |
| // Access a struct and fields inside of it. |
| checkAnswer( |
| sql("select struct, struct.field1, struct.field2 from jsonTable"), |
| Row( |
| Row("true", "92233720368547758070"), |
| "true", |
| "92233720368547758070") :: Nil |
| ) |
| |
| // Access an array field of a struct. |
| checkAnswer( |
| sql("select structWithArrayFields.field1, structWithArrayFields.field2 from jsonTable"), |
| Row(Seq("4", "5", "6"), Seq("str1", "str2")) |
| ) |
| |
| // Access elements of an array field of a struct. |
| checkAnswer( |
| sql("select structWithArrayFields.field1[1], structWithArrayFields.field2[3] from " + |
| "jsonTable"), |
| Row("5", null) |
| ) |
| } |
| } |
| |
| test("Loading a JSON dataset prefersDecimal returns schema with float types as BigDecimal") { |
| withTempView("jsonTable") { |
| val jsonDF = spark.read.option("prefersDecimal", "true").json(primitiveFieldAndType) |
| |
| val expectedSchema = StructType( |
| StructField("bigInteger", DecimalType(20, 0), true) :: |
| StructField("boolean", BooleanType, true) :: |
| StructField("double", DecimalType(17, 16), true) :: |
| StructField("integer", LongType, true) :: |
| StructField("long", LongType, true) :: |
| StructField("null", StringType, true) :: |
| StructField("string", StringType, true) :: Nil) |
| |
| assert(expectedSchema === jsonDF.schema) |
| |
| jsonDF.createOrReplaceTempView("jsonTable") |
| |
| checkAnswer( |
| sql("select * from jsonTable"), |
| Row(BigDecimal("92233720368547758070"), |
| true, |
| BigDecimal("1.7976931348623157"), |
| 10, |
| 21474836470L, |
| null, |
| "this is a simple string.") |
| ) |
| } |
| } |
| |
| test("Find compatible types even if inferred DecimalType is not capable of other IntegralType") { |
| val mixedIntegerAndDoubleRecords = Seq( |
| """{"a": 3, "b": 1.1}""", |
| s"""{"a": 3.1, "b": 0.${"0" * 38}1}""").toDS() |
| val jsonDF = spark.read |
| .option("prefersDecimal", "true") |
| .json(mixedIntegerAndDoubleRecords) |
| |
| // The values in `a` field will be decimals as they fit in decimal. For `b` field, |
| // they will be doubles as `1.0E-39D` does not fit. |
| val expectedSchema = StructType( |
| StructField("a", DecimalType(21, 1), true) :: |
| StructField("b", DoubleType, true) :: Nil) |
| |
| assert(expectedSchema === jsonDF.schema) |
| checkAnswer( |
| jsonDF, |
| Row(BigDecimal("3"), 1.1D) :: |
| Row(BigDecimal("3.1"), 1.0E-39D) :: Nil |
| ) |
| } |
| |
| test("Infer big integers correctly even when it does not fit in decimal") { |
| val jsonDF = spark.read |
| .json(bigIntegerRecords) |
| |
| // The value in `a` field will be a double as it does not fit in decimal. For `b` field, |
| // it will be a decimal as `92233720368547758070`. |
| val expectedSchema = StructType( |
| StructField("a", DoubleType, true) :: |
| StructField("b", DecimalType(20, 0), true) :: Nil) |
| |
| assert(expectedSchema === jsonDF.schema) |
| checkAnswer(jsonDF, Row(1.0E38D, BigDecimal("92233720368547758070"))) |
| } |
| |
| test("Infer floating-point values correctly even when it does not fit in decimal") { |
| val jsonDF = spark.read |
| .option("prefersDecimal", "true") |
| .json(floatingValueRecords) |
| |
| // The value in `a` field will be a double as it does not fit in decimal. For `b` field, |
| // it will be a decimal as `0.01` by having a precision equal to the scale. |
| val expectedSchema = StructType( |
| StructField("a", DoubleType, true) :: |
| StructField("b", DecimalType(2, 2), true):: Nil) |
| |
| assert(expectedSchema === jsonDF.schema) |
| checkAnswer(jsonDF, Row(1.0E-39D, BigDecimal("0.01"))) |
| |
| val mergedJsonDF = spark.read |
| .option("prefersDecimal", "true") |
| .json(floatingValueRecords.union(bigIntegerRecords)) |
| |
| val expectedMergedSchema = StructType( |
| StructField("a", DoubleType, true) :: |
| StructField("b", DecimalType(22, 2), true):: Nil) |
| |
| assert(expectedMergedSchema === mergedJsonDF.schema) |
| checkAnswer( |
| mergedJsonDF, |
| Row(1.0E-39D, BigDecimal("0.01")) :: |
| Row(1.0E38D, BigDecimal("92233720368547758070")) :: Nil |
| ) |
| } |
| |
| test("Loading a JSON dataset from a text file with SQL") { |
| val dir = Utils.createTempDir() |
| dir.delete() |
| val path = dir.toURI.toString |
| primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path) |
| |
| sql( |
| s""" |
| |CREATE TEMPORARY VIEW jsonTableSQL |
| |USING org.apache.spark.sql.json |
| |OPTIONS ( |
| | path '$path' |
| |) |
| """.stripMargin) |
| |
| checkAnswer( |
| sql("select * from jsonTableSQL"), |
| Row(new java.math.BigDecimal("92233720368547758070"), |
| true, |
| 1.7976931348623157, |
| 10, |
| 21474836470L, |
| null, |
| "this is a simple string.") |
| ) |
| } |
| |
| test("Applying schemas") { |
| withTempView("jsonTable1", "jsonTable2") { |
| val dir = Utils.createTempDir() |
| dir.delete() |
| val path = dir.getCanonicalPath |
| primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path) |
| |
| val schema = StructType( |
| StructField("bigInteger", DecimalType.SYSTEM_DEFAULT, true) :: |
| StructField("boolean", BooleanType, true) :: |
| StructField("double", DoubleType, true) :: |
| StructField("integer", IntegerType, true) :: |
| StructField("long", LongType, true) :: |
| StructField("null", StringType, true) :: |
| StructField("string", StringType, true) :: Nil) |
| |
| val jsonDF1 = spark.read.schema(schema).json(path) |
| |
| assert(schema === jsonDF1.schema) |
| |
| jsonDF1.createOrReplaceTempView("jsonTable1") |
| |
| checkAnswer( |
| sql("select * from jsonTable1"), |
| Row(new java.math.BigDecimal("92233720368547758070"), |
| true, |
| 1.7976931348623157, |
| 10, |
| 21474836470L, |
| null, |
| "this is a simple string.") |
| ) |
| |
| val jsonDF2 = spark.read.schema(schema).json(primitiveFieldAndType) |
| |
| assert(schema === jsonDF2.schema) |
| |
| jsonDF2.createOrReplaceTempView("jsonTable2") |
| |
| checkAnswer( |
| sql("select * from jsonTable2"), |
| Row(new java.math.BigDecimal("92233720368547758070"), |
| true, |
| 1.7976931348623157, |
| 10, |
| 21474836470L, |
| null, |
| "this is a simple string.") |
| ) |
| } |
| } |
| |
| test("Applying schemas with MapType") { |
| withTempView("jsonWithSimpleMap", "jsonWithComplexMap") { |
| val schemaWithSimpleMap = StructType( |
| StructField("map", MapType(StringType, IntegerType, true), false) :: Nil) |
| val jsonWithSimpleMap = spark.read.schema(schemaWithSimpleMap).json(mapType1) |
| |
| jsonWithSimpleMap.createOrReplaceTempView("jsonWithSimpleMap") |
| |
| checkAnswer( |
| sql("select `map` from jsonWithSimpleMap"), |
| Row(Map("a" -> 1)) :: |
| Row(Map("b" -> 2)) :: |
| Row(Map("c" -> 3)) :: |
| Row(Map("c" -> 1, "d" -> 4)) :: |
| Row(Map("e" -> null)) :: Nil |
| ) |
| |
| withSQLConf(SQLConf.SUPPORT_QUOTED_REGEX_COLUMN_NAME.key -> "false") { |
| checkAnswer( |
| sql("select `map`['c'] from jsonWithSimpleMap"), |
| Row(null) :: |
| Row(null) :: |
| Row(3) :: |
| Row(1) :: |
| Row(null) :: Nil |
| ) |
| } |
| |
| val innerStruct = StructType( |
| StructField("field1", ArrayType(IntegerType, true), true) :: |
| StructField("field2", IntegerType, true) :: Nil) |
| val schemaWithComplexMap = StructType( |
| StructField("map", MapType(StringType, innerStruct, true), false) :: Nil) |
| |
| val jsonWithComplexMap = spark.read.schema(schemaWithComplexMap).json(mapType2) |
| |
| jsonWithComplexMap.createOrReplaceTempView("jsonWithComplexMap") |
| |
| checkAnswer( |
| sql("select `map` from jsonWithComplexMap"), |
| Row(Map("a" -> Row(Seq(1, 2, 3, null), null))) :: |
| Row(Map("b" -> Row(null, 2))) :: |
| Row(Map("c" -> Row(Seq(), 4))) :: |
| Row(Map("c" -> Row(null, 3), "d" -> Row(Seq(null), null))) :: |
| Row(Map("e" -> null)) :: |
| Row(Map("f" -> Row(null, null))) :: Nil |
| ) |
| |
| withSQLConf(SQLConf.SUPPORT_QUOTED_REGEX_COLUMN_NAME.key -> "false") { |
| checkAnswer( |
| sql("select `map`['a'].field1, `map`['c'].field2 from jsonWithComplexMap"), |
| Row(Seq(1, 2, 3, null), null) :: |
| Row(null, null) :: |
| Row(null, 4) :: |
| Row(null, 3) :: |
| Row(null, null) :: |
| Row(null, null) :: Nil |
| ) |
| } |
| } |
| } |
| |
| test("SPARK-2096 Correctly parse dot notations") { |
| withTempView("jsonTable") { |
| val jsonDF = spark.read.json(complexFieldAndType2) |
| jsonDF.createOrReplaceTempView("jsonTable") |
| |
| checkAnswer( |
| sql("select arrayOfStruct[0].field1, arrayOfStruct[0].field2 from jsonTable"), |
| Row(true, "str1") |
| ) |
| checkAnswer( |
| sql( |
| """ |
| |select complexArrayOfStruct[0].field1[1].inner2[0], |
| |complexArrayOfStruct[1].field2[0][1] |
| |from jsonTable |
| """.stripMargin), |
| Row("str2", 6) |
| ) |
| } |
| } |
| |
| test("SPARK-3390 Complex arrays") { |
| withTempView("jsonTable") { |
| val jsonDF = spark.read.json(complexFieldAndType2) |
| jsonDF.createOrReplaceTempView("jsonTable") |
| |
| checkAnswer( |
| sql( |
| """ |
| |select arrayOfArray1[0][0][0], arrayOfArray1[1][0][1], arrayOfArray1[1][1][0] |
| |from jsonTable |
| """.stripMargin), |
| Row(5, 7, 8) |
| ) |
| checkAnswer( |
| sql( |
| """ |
| |select arrayOfArray2[0][0][0].inner1, arrayOfArray2[1][0], |
| |arrayOfArray2[1][1][1].inner2[0], arrayOfArray2[2][0][0].inner3[0][0].inner4 |
| |from jsonTable |
| """.stripMargin), |
| Row("str1", Nil, "str4", 2) |
| ) |
| } |
| } |
| |
| test("SPARK-3308 Read top level JSON arrays") { |
| withTempView("jsonTable") { |
| val jsonDF = spark.read.json(jsonArray) |
| jsonDF.createOrReplaceTempView("jsonTable") |
| |
| checkAnswer( |
| sql( |
| """ |
| |select a, b, c |
| |from jsonTable |
| """.stripMargin), |
| Row("str_a_1", null, null) :: |
| Row("str_a_2", null, null) :: |
| Row(null, "str_b_3", null) :: |
| Row("str_a_4", "str_b_4", "str_c_4") :: Nil |
| ) |
| } |
| } |
| |
| test("Corrupt records: FAILFAST mode") { |
| // `FAILFAST` mode should throw an exception for corrupt records. |
| checkError( |
| exception = intercept[SparkException] { |
| spark.read |
| .option("mode", "FAILFAST") |
| .json(corruptRecords) |
| }, |
| errorClass = "_LEGACY_ERROR_TEMP_2165", |
| parameters = Map("failFastMode" -> "FAILFAST") |
| ) |
| |
| checkError( |
| exception = intercept[SparkException] { |
| spark.read |
| .option("mode", "FAILFAST") |
| .schema("a string") |
| .json(corruptRecords) |
| .collect() |
| }, |
| errorClass = "MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION", |
| parameters = Map( |
| "badRecord" -> "[null]", |
| "failFastMode" -> "FAILFAST") |
| ) |
| } |
| |
| test("Corrupt records: DROPMALFORMED mode") { |
| val schemaOne = StructType( |
| StructField("a", StringType, true) :: |
| StructField("b", StringType, true) :: |
| StructField("c", StringType, true) :: Nil) |
| val schemaTwo = StructType( |
| StructField("a", StringType, true) :: Nil) |
| // `DROPMALFORMED` mode should skip corrupt records |
| val jsonDFOne = spark.read |
| .option("mode", "DROPMALFORMED") |
| .json(corruptRecords) |
| checkAnswer( |
| jsonDFOne, |
| Row("str_a_4", "str_b_4", "str_c_4") :: Nil |
| ) |
| assert(jsonDFOne.schema === schemaOne) |
| |
| val jsonDFTwo = spark.read |
| .option("mode", "DROPMALFORMED") |
| .schema(schemaTwo) |
| .json(corruptRecords) |
| checkAnswer( |
| jsonDFTwo, |
| Row("str_a_4") :: Nil) |
| assert(jsonDFTwo.schema === schemaTwo) |
| } |
| |
| test("SPARK-19641: Additional corrupt records: DROPMALFORMED mode") { |
| val schema = new StructType().add("dummy", StringType) |
| // `DROPMALFORMED` mode should skip corrupt records |
| val jsonDF = spark.read |
| .option("mode", "DROPMALFORMED") |
| .json(additionalCorruptRecords) |
| checkAnswer( |
| jsonDF, |
| Row("test")) |
| assert(jsonDF.schema === schema) |
| } |
| |
| test("Corrupt records: PERMISSIVE mode, without designated column for malformed records") { |
| val schema = StructType( |
| StructField("a", StringType, true) :: |
| StructField("b", StringType, true) :: |
| StructField("c", StringType, true) :: Nil) |
| |
| val jsonDF = spark.read.schema(schema).json(corruptRecords) |
| |
| checkAnswer( |
| jsonDF.select($"a", $"b", $"c"), |
| Seq( |
| // Corrupted records are replaced with null |
| Row(null, null, null), |
| Row(null, null, null), |
| Row(null, null, null), |
| Row("str_a_4", "str_b_4", "str_c_4"), |
| Row(null, null, null)) |
| ) |
| } |
| |
| test("Corrupt records: PERMISSIVE mode, with designated column for malformed records") { |
| // Test if we can query corrupt records. |
| withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") { |
| val jsonDF = spark.read.json(corruptRecords) |
| val schema = StructType( |
| StructField("_unparsed", StringType, true) :: |
| StructField("a", StringType, true) :: |
| StructField("b", StringType, true) :: |
| StructField("c", StringType, true) :: Nil) |
| |
| assert(schema === jsonDF.schema) |
| |
| // In HiveContext, backticks should be used to access columns starting with a underscore. |
| checkAnswer( |
| jsonDF.select($"a", $"b", $"c", $"_unparsed"), |
| Row(null, null, null, "{") :: |
| Row(null, null, null, """{"a":1, b:2}""") :: |
| Row(null, null, null, """{"a":{, b:3}""") :: |
| Row("str_a_4", "str_b_4", "str_c_4", null) :: |
| Row(null, null, null, "]") :: Nil |
| ) |
| |
| checkAnswer( |
| jsonDF.filter($"_unparsed".isNull).select($"a", $"b", $"c"), |
| Row("str_a_4", "str_b_4", "str_c_4") |
| ) |
| |
| checkAnswer( |
| jsonDF.filter($"_unparsed".isNotNull).select($"_unparsed"), |
| Row("{") :: |
| Row("""{"a":1, b:2}""") :: |
| Row("""{"a":{, b:3}""") :: |
| Row("]") :: Nil |
| ) |
| } |
| } |
| |
| test("SPARK-13953 Rename the corrupt record field via option") { |
| val jsonDF = spark.read |
| .option("columnNameOfCorruptRecord", "_malformed") |
| .json(corruptRecords) |
| val schema = StructType( |
| StructField("_malformed", StringType, true) :: |
| StructField("a", StringType, true) :: |
| StructField("b", StringType, true) :: |
| StructField("c", StringType, true) :: Nil) |
| |
| assert(schema === jsonDF.schema) |
| checkAnswer( |
| jsonDF.selectExpr("a", "b", "c", "_malformed"), |
| Row(null, null, null, "{") :: |
| Row(null, null, null, """{"a":1, b:2}""") :: |
| Row(null, null, null, """{"a":{, b:3}""") :: |
| Row("str_a_4", "str_b_4", "str_c_4", null) :: |
| Row(null, null, null, "]") :: Nil |
| ) |
| } |
| |
| test("SPARK-4068: nulls in arrays") { |
| withTempView("jsonTable") { |
| val jsonDF = spark.read.json(nullsInArrays) |
| jsonDF.createOrReplaceTempView("jsonTable") |
| |
| val schema = StructType( |
| StructField("field1", |
| ArrayType(ArrayType(ArrayType(ArrayType(StringType, true), true), true), true), true) :: |
| StructField("field2", |
| ArrayType(ArrayType( |
| StructType(StructField("Test", LongType, true) :: Nil), true), true), true) :: |
| StructField("field3", |
| ArrayType(ArrayType( |
| StructType(StructField("Test", StringType, true) :: Nil), true), true), true) :: |
| StructField("field4", |
| ArrayType(ArrayType(ArrayType(LongType, true), true), true), true) :: Nil) |
| |
| assert(schema === jsonDF.schema) |
| |
| checkAnswer( |
| sql( |
| """ |
| |SELECT field1, field2, field3, field4 |
| |FROM jsonTable |
| """.stripMargin), |
| Row(Seq(Seq(null), Seq(Seq(Seq("Test")))), null, null, null) :: |
| Row(null, Seq(null, Seq(Row(1))), null, null) :: |
| Row(null, null, Seq(Seq(null), Seq(Row("2"))), null) :: |
| Row(null, null, null, Seq(Seq(null, Seq(1, 2, 3)))) :: Nil |
| ) |
| } |
| } |
| |
| test("SPARK-4228 DataFrame to JSON") { |
| withTempView("applySchema1", "applySchema2", "primitiveTable", "complexTable") { |
| val schema1 = StructType( |
| StructField("f1", IntegerType, false) :: |
| StructField("f2", StringType, false) :: |
| StructField("f3", BooleanType, false) :: |
| StructField("f4", ArrayType(StringType), nullable = true) :: |
| StructField("f5", IntegerType, true) :: Nil) |
| |
| val rowRDD1 = unparsedStrings.map { r => |
| val values = r.split(",").map(_.trim) |
| val v5 = try values(3).toInt catch { |
| case _: NumberFormatException => null |
| } |
| Row(values(0).toInt, values(1), values(2).toBoolean, r.split(",").toList, v5) |
| } |
| |
| val df1 = spark.createDataFrame(rowRDD1, schema1) |
| df1.createOrReplaceTempView("applySchema1") |
| val df2 = df1.toDF() |
| val result = df2.toJSON.collect() |
| // scalastyle:off |
| assert(result(0) === "{\"f1\":1,\"f2\":\"A1\",\"f3\":true,\"f4\":[\"1\",\" A1\",\" true\",\" null\"]}") |
| assert(result(3) === "{\"f1\":4,\"f2\":\"D4\",\"f3\":true,\"f4\":[\"4\",\" D4\",\" true\",\" 2147483644\"],\"f5\":2147483644}") |
| // scalastyle:on |
| |
| val schema2 = StructType( |
| StructField("f1", StructType( |
| StructField("f11", IntegerType, false) :: |
| StructField("f12", BooleanType, false) :: Nil), false) :: |
| StructField("f2", MapType(StringType, IntegerType, true), false) :: Nil) |
| |
| val rowRDD2 = unparsedStrings.map { r => |
| val values = r.split(",").map(_.trim) |
| val v4 = try values(3).toInt catch { |
| case _: NumberFormatException => null |
| } |
| Row(Row(values(0).toInt, values(2).toBoolean), Map(values(1) -> v4)) |
| } |
| |
| val df3 = spark.createDataFrame(rowRDD2, schema2) |
| df3.createOrReplaceTempView("applySchema2") |
| val df4 = df3.toDF() |
| val result2 = df4.toJSON.collect() |
| |
| assert(result2(1) === "{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}") |
| assert(result2(3) === "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}") |
| |
| val jsonDF = spark.read.json(primitiveFieldAndType) |
| val primTable = spark.read.json(jsonDF.toJSON) |
| primTable.createOrReplaceTempView("primitiveTable") |
| checkAnswer( |
| sql("select * from primitiveTable"), |
| Row(new java.math.BigDecimal("92233720368547758070"), |
| true, |
| 1.7976931348623157, |
| 10, |
| 21474836470L, |
| "this is a simple string.") |
| ) |
| |
| val complexJsonDF = spark.read.json(complexFieldAndType1) |
| val compTable = spark.read.json(complexJsonDF.toJSON) |
| compTable.createOrReplaceTempView("complexTable") |
| // Access elements of a primitive array. |
| checkAnswer( |
| sql("select arrayOfString[0], arrayOfString[1], arrayOfString[2] from complexTable"), |
| Row("str1", "str2", null) |
| ) |
| |
| // Access an array of null values. |
| checkAnswer( |
| sql("select arrayOfNull from complexTable"), |
| Row(Seq(null, null, null, null)) |
| ) |
| |
| // Access elements of a BigInteger array (we use DecimalType internally). |
| checkAnswer( |
| sql("select arrayOfBigInteger[0], arrayOfBigInteger[1], arrayOfBigInteger[2] " + |
| " from complexTable"), |
| Row(new java.math.BigDecimal("922337203685477580700"), |
| new java.math.BigDecimal("-922337203685477580800"), null) |
| ) |
| |
| // Access elements of an array of arrays. |
| checkAnswer( |
| sql("select arrayOfArray1[0], arrayOfArray1[1] from complexTable"), |
| Row(Seq("1", "2", "3"), Seq("str1", "str2")) |
| ) |
| |
| // Access elements of an array of arrays. |
| checkAnswer( |
| sql("select arrayOfArray2[0], arrayOfArray2[1] from complexTable"), |
| Row(Seq(1.0, 2.0, 3.0), Seq(1.1, 2.1, 3.1)) |
| ) |
| |
| // Access elements of an array inside a filed with the type of ArrayType(ArrayType). |
| checkAnswer( |
| sql("select arrayOfArray1[1][1], arrayOfArray2[1][1] from complexTable"), |
| Row("str2", 2.1) |
| ) |
| |
| // Access a struct and fields inside of it. |
| checkAnswer( |
| sql("select struct, struct.field1, struct.field2 from complexTable"), |
| Row( |
| Row(true, new java.math.BigDecimal("92233720368547758070")), |
| true, |
| new java.math.BigDecimal("92233720368547758070")) :: Nil |
| ) |
| |
| // Access an array field of a struct. |
| checkAnswer( |
| sql("select structWithArrayFields.field1, structWithArrayFields.field2 from complexTable"), |
| Row(Seq(4, 5, 6), Seq("str1", "str2")) |
| ) |
| |
| // Access elements of an array field of a struct. |
| checkAnswer( |
| sql("select structWithArrayFields.field1[1], structWithArrayFields.field2[3] " + |
| "from complexTable"), |
| Row(5, null) |
| ) |
| } |
| } |
| |
| test("Dataset toJSON doesn't construct rdd") { |
| val containsRDDExists = spark.emptyDataFrame.toJSON.queryExecution.logical.exists { |
| case ExternalRDD(_, _) => true |
| case _ => false |
| } |
| |
| assert(!containsRDDExists, "Expected logical plan of toJSON to not contain an RDD") |
| } |
| |
| test("JSONRelation equality test") { |
| withTempPath(dir => { |
| val path = dir.getCanonicalFile.toURI.toString |
| sparkContext.parallelize(1 to 100) |
| .map(i => s"""{"a": 1, "b": "str$i"}""").saveAsTextFile(path) |
| |
| val d1 = DataSource( |
| spark, |
| userSpecifiedSchema = None, |
| partitionColumns = Array.empty[String].toImmutableArraySeq, |
| bucketSpec = None, |
| className = classOf[JsonFileFormat].getCanonicalName, |
| options = Map("path" -> path)).resolveRelation() |
| |
| val d2 = DataSource( |
| spark, |
| userSpecifiedSchema = None, |
| partitionColumns = Array.empty[String].toImmutableArraySeq, |
| bucketSpec = None, |
| className = classOf[JsonFileFormat].getCanonicalName, |
| options = Map("path" -> path)).resolveRelation() |
| assert(d1 === d2) |
| }) |
| } |
| |
| test("SPARK-6245 JsonInferSchema.infer on empty RDD") { |
| // This is really a test that it doesn't throw an exception |
| val options = new JSONOptions(Map.empty[String, String], "UTC") |
| val emptySchema = new JsonInferSchema(options).infer( |
| empty.rdd, |
| CreateJacksonParser.string) |
| assert(StructType(Seq()) === emptySchema) |
| } |
| |
| test("SPARK-7565 MapType in JsonRDD") { |
| withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") { |
| withTempDir { dir => |
| val schemaWithSimpleMap = StructType( |
| StructField("map", MapType(StringType, IntegerType, true), false) :: Nil) |
| val df = spark.read.schema(schemaWithSimpleMap).json(mapType1) |
| |
| val path = dir.getAbsolutePath |
| df.write.mode("overwrite").parquet(path) |
| // order of MapType is not defined |
| assert(spark.read.parquet(path).count() == 5) |
| |
| val df2 = spark.read.json(corruptRecords) |
| df2.write.mode("overwrite").parquet(path) |
| checkAnswer(spark.read.parquet(path), df2.collect()) |
| } |
| } |
| } |
| |
| test("SPARK-8093 Erase empty structs") { |
| val options = new JSONOptions(Map.empty[String, String], "UTC") |
| val emptySchema = new JsonInferSchema(options).infer( |
| emptyRecords.rdd, |
| CreateJacksonParser.string) |
| assert(StructType(Seq()) === emptySchema) |
| } |
| |
| test("JSON with Partition") { |
| def makePartition(rdd: RDD[String], parent: File, partName: String, partValue: Any): File = { |
| val p = new File(parent, s"$partName=${partValue.toString}") |
| rdd.saveAsTextFile(p.getCanonicalPath) |
| p |
| } |
| |
| withTempPath(root => { |
| withTempView("test_myjson_with_part") { |
| val d1 = new File(root, "d1=1") |
| // root/dt=1/col1=abc |
| val p1_col1 = makePartition( |
| sparkContext.parallelize(2 to 5).map(i => s"""{"a": 1, "b": "str$i"}"""), |
| d1, |
| "col1", |
| "abc") |
| |
| // root/dt=1/col1=abd |
| val p2 = makePartition( |
| sparkContext.parallelize(6 to 10).map(i => s"""{"a": 1, "b": "str$i"}"""), |
| d1, |
| "col1", |
| "abd") |
| |
| spark.read.json(root.getAbsolutePath).createOrReplaceTempView("test_myjson_with_part") |
| checkAnswer(sql( |
| "SELECT count(a) FROM test_myjson_with_part where d1 = 1 and col1='abc'"), Row(4)) |
| checkAnswer(sql( |
| "SELECT count(a) FROM test_myjson_with_part where d1 = 1 and col1='abd'"), Row(5)) |
| checkAnswer(sql( |
| "SELECT count(a) FROM test_myjson_with_part where d1 = 1"), Row(9)) |
| } |
| }) |
| } |
| |
| test("backward compatibility") { |
| // This test we make sure our JSON support can read JSON data generated by previous version |
| // of Spark generated through toJSON method and JSON data source. |
| // The data is generated by the following program. |
| // Here are a few notes: |
| // - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13) |
| // in the JSON object. |
| // - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to |
| // JSON objects generated by those Spark versions (col17). |
| // - If the type is NullType, we do not write data out. |
| |
| // Create the schema. |
| val struct = |
| StructType( |
| StructField("f1", FloatType, true) :: |
| StructField("f2", ArrayType(BooleanType), true) :: Nil) |
| |
| val dataTypes = |
| Seq( |
| StringType, BinaryType, NullType, BooleanType, |
| ByteType, ShortType, IntegerType, LongType, |
| FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), |
| DateType, TimestampType, |
| ArrayType(IntegerType), MapType(StringType, LongType), struct, |
| new MyDenseVectorUDT()) |
| val fields = dataTypes.zipWithIndex.map { case (dataType, index) => |
| StructField(s"col$index", dataType, nullable = true) |
| } |
| val schema = StructType(fields) |
| |
| val constantValues = |
| Seq( |
| "a string in binary".getBytes(StandardCharsets.UTF_8), |
| null, |
| true, |
| 1.toByte, |
| 2.toShort, |
| 3, |
| Long.MaxValue, |
| 0.25.toFloat, |
| 0.75, |
| new java.math.BigDecimal(s"1234.23456"), |
| new java.math.BigDecimal(s"1.23456"), |
| java.sql.Date.valueOf("2015-01-01"), |
| java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"), |
| Seq(2, 3, 4), |
| Map("a string" -> 2000L), |
| Row(4.75.toFloat, Seq(false, true)), |
| new MyDenseVector(Array(0.25, 2.25, 4.25))) |
| val data = |
| Row.fromSeq(Seq("Spark " + spark.sparkContext.version) ++ constantValues) :: Nil |
| |
| // Data generated by previous versions. |
| // scalastyle:off |
| val existingJSONData = |
| """{"col0":"Spark 1.2.2","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: |
| """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: |
| """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: |
| """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: |
| """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: |
| """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil |
| // scalastyle:on |
| |
| // Generate data for the current version. |
| val df = spark.createDataFrame(spark.sparkContext.parallelize(data, 1), schema) |
| withTempPath { path => |
| df.write.format("json").mode("overwrite").save(path.getCanonicalPath) |
| |
| // df.toJSON will convert internal rows to external rows first and then generate |
| // JSON objects. While, df.write.format("json") will write internal rows directly. |
| val allJSON = |
| existingJSONData ++ |
| df.toJSON.collect() ++ |
| sparkContext.textFile(path.getCanonicalPath).collect() |
| |
| Utils.deleteRecursively(path) |
| sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath) |
| |
| // Read data back with the schema specified. |
| val col0Values = |
| Seq( |
| "Spark 1.2.2", |
| "Spark 1.3.1", |
| "Spark 1.3.1", |
| "Spark 1.4.1", |
| "Spark 1.4.1", |
| "Spark 1.5.0", |
| "Spark " + spark.sparkContext.version, |
| "Spark " + spark.sparkContext.version) |
| val expectedResult = col0Values.map { v => |
| Row.fromSeq(Seq(v) ++ constantValues) |
| } |
| checkAnswer( |
| spark.read.format("json").schema(schema).load(path.getCanonicalPath), |
| expectedResult |
| ) |
| } |
| } |
| |
| test("SPARK-11544 test pathfilter") { |
| withTempPath { dir => |
| val path = dir.getCanonicalPath |
| |
| val df = spark.range(2) |
| df.write.json(path + "/p=1") |
| df.write.json(path + "/p=2") |
| assert(spark.read.json(path).count() === 4) |
| |
| val extraOptions = Map( |
| "mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName, |
| "mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName |
| ) |
| assert(spark.read.options(extraOptions).json(path).count() === 2) |
| |
| withClue("SPARK-32621: 'path' option can cause issues while inferring schema") { |
| // During infer, "path" option is used again on top of the paths that have already been |
| // listed. When a partition is removed by TestFileFilter, this will cause a conflict while |
| // inferring partitions because the original path in the "path" option will list the |
| // partition directory that has been removed. |
| assert( |
| spark.read.options(extraOptions).format("json").option("path", path).load().count() === 2) |
| } |
| } |
| } |
| |
| test("SPARK-12057 additional corrupt records do not throw exceptions") { |
| // Test if we can query corrupt records. |
| withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") { |
| withTempView("jsonTable") { |
| val schema = StructType( |
| StructField("_unparsed", StringType, true) :: |
| StructField("dummy", StringType, true) :: Nil) |
| |
| { |
| // We need to make sure we can infer the schema. |
| val jsonDF = spark.read.json(additionalCorruptRecords) |
| assert(jsonDF.schema === schema) |
| } |
| |
| { |
| val jsonDF = spark.read.schema(schema).json(additionalCorruptRecords) |
| jsonDF.createOrReplaceTempView("jsonTable") |
| |
| // In HiveContext, backticks should be used to access columns starting with a underscore. |
| checkAnswer( |
| sql( |
| """ |
| |SELECT dummy, _unparsed |
| |FROM jsonTable |
| """.stripMargin), |
| Row("test", null) :: |
| Row(null, """[1,2,3]""") :: |
| Row(null, """":"test", "a":1}""") :: |
| Row(null, """42""") :: |
| Row(null, """ ","ian":"test"}""") :: Nil |
| ) |
| } |
| } |
| } |
| } |
| |
| test("Parse JSON rows having an array type and a struct type in the same field.") { |
| withTempDir { dir => |
| val dir = Utils.createTempDir() |
| dir.delete() |
| val path = dir.getCanonicalPath |
| arrayAndStructRecords.map(record => record.replaceAll("\n", " ")).write.text(path) |
| |
| val schema = |
| StructType( |
| StructField("a", StructType( |
| StructField("b", StringType) :: Nil |
| )) :: Nil) |
| val jsonDF = spark.read.schema(schema).json(path) |
| assert(jsonDF.count() == 2) |
| } |
| } |
| |
| test("SPARK-12872 Support to specify the option for compression codec") { |
| withTempDir { dir => |
| val dir = Utils.createTempDir() |
| dir.delete() |
| val path = dir.getCanonicalPath |
| primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path) |
| |
| val jsonDF = spark.read.json(path) |
| val jsonDir = new File(dir, "json").getCanonicalPath |
| jsonDF.coalesce(1).write |
| .format("json") |
| .option("compression", "gZiP") |
| .save(jsonDir) |
| |
| val compressedFiles = new File(jsonDir).listFiles() |
| assert(compressedFiles.exists(_.getName.endsWith(".json.gz"))) |
| |
| val jsonCopy = spark.read |
| .format("json") |
| .load(jsonDir) |
| |
| assert(jsonCopy.count() == jsonDF.count()) |
| val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") |
| val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") |
| checkAnswer(jsonCopySome, jsonDFSome) |
| } |
| } |
| |
| test("SPARK-13543 Write the output as uncompressed via option()") { |
| val extraOptions = Map[String, String]( |
| "mapreduce.output.fileoutputformat.compress" -> "true", |
| "mapreduce.output.fileoutputformat.compress.type" -> CompressionType.BLOCK.toString, |
| "mapreduce.output.fileoutputformat.compress.codec" -> classOf[GzipCodec].getName, |
| "mapreduce.map.output.compress" -> "true", |
| "mapreduce.map.output.compress.codec" -> classOf[GzipCodec].getName |
| ) |
| withTempDir { dir => |
| val dir = Utils.createTempDir() |
| dir.delete() |
| |
| val path = dir.getCanonicalPath |
| primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path) |
| |
| val jsonDF = spark.read.json(path) |
| val jsonDir = new File(dir, "json").getCanonicalPath |
| jsonDF.coalesce(1).write |
| .format("json") |
| .option("compression", HadoopCompressionCodec.NONE.lowerCaseName()) |
| .options(extraOptions) |
| .save(jsonDir) |
| |
| val compressedFiles = new File(jsonDir).listFiles() |
| assert(compressedFiles.exists(!_.getName.endsWith(".json.gz"))) |
| |
| val jsonCopy = spark.read |
| .format("json") |
| .options(extraOptions) |
| .load(jsonDir) |
| |
| assert(jsonCopy.count() == jsonDF.count()) |
| val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") |
| val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") |
| checkAnswer(jsonCopySome, jsonDFSome) |
| } |
| } |
| |
| test("Casting long as timestamp") { |
| withTempView("jsonTable") { |
| val schema = (new StructType).add("ts", TimestampType) |
| val jsonDF = spark.read.schema(schema).json(timestampAsLong) |
| |
| jsonDF.createOrReplaceTempView("jsonTable") |
| |
| checkAnswer( |
| sql("select ts from jsonTable"), |
| Row(java.sql.Timestamp.valueOf("2016-01-02 03:04:05")) |
| ) |
| } |
| } |
| |
| test("wide nested json table") { |
| val nested = (1 to 100).map { i => |
| s""" |
| |"c$i": $i |
| """.stripMargin |
| }.mkString(", ") |
| val json = s""" |
| |{"a": [{$nested}], "b": [{$nested}]} |
| """.stripMargin |
| val df = spark.read.json(Seq(json).toDS()) |
| assert(df.schema.size === 2) |
| df.collect() |
| } |
| |
| test("Write dates correctly with dateFormat option") { |
| val customSchema = new StructType(Array(StructField("date", DateType, true))) |
| withTempDir { dir => |
| // With dateFormat option. |
| val datesWithFormatPath = s"${dir.getCanonicalPath}/datesWithFormat.json" |
| val datesWithFormat = spark.read |
| .schema(customSchema) |
| .option("dateFormat", "dd/MM/yyyy HH:mm") |
| .json(datesRecords) |
| |
| datesWithFormat.write |
| .format("json") |
| .option("dateFormat", "yyyy/MM/dd") |
| .save(datesWithFormatPath) |
| |
| // This will load back the dates as string. |
| val stringSchema = StructType(StructField("date", StringType, true) :: Nil) |
| val stringDatesWithFormat = spark.read |
| .schema(stringSchema) |
| .json(datesWithFormatPath) |
| val expectedStringDatesWithFormat = Seq( |
| Row("2015/08/26"), |
| Row("2014/10/27"), |
| Row("2016/01/28")) |
| |
| checkAnswer(stringDatesWithFormat, expectedStringDatesWithFormat) |
| } |
| } |
| |
| test("Write timestamps correctly with timestampFormat option") { |
| val customSchema = new StructType(Array(StructField("date", TimestampType, true))) |
| withTempDir { dir => |
| // With dateFormat option. |
| val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json" |
| val timestampsWithFormat = spark.read |
| .schema(customSchema) |
| .option("timestampFormat", "dd/MM/yyyy HH:mm") |
| .json(datesRecords) |
| timestampsWithFormat.write |
| .format("json") |
| .option("timestampFormat", "yyyy/MM/dd HH:mm") |
| .save(timestampsWithFormatPath) |
| |
| // This will load back the timestamps as string. |
| val stringSchema = StructType(StructField("date", StringType, true) :: Nil) |
| val stringTimestampsWithFormat = spark.read |
| .schema(stringSchema) |
| .json(timestampsWithFormatPath) |
| val expectedStringDatesWithFormat = Seq( |
| Row("2015/08/26 18:00"), |
| Row("2014/10/27 18:30"), |
| Row("2016/01/28 20:00")) |
| |
| checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat) |
| } |
| } |
| |
| test("Write timestamps correctly with timestampFormat option and timeZone option") { |
| val customSchema = new StructType(Array(StructField("date", TimestampType, true))) |
| withTempDir { dir => |
| // With dateFormat option and timeZone option. |
| val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json" |
| val timestampsWithFormat = spark.read |
| .schema(customSchema) |
| .option("timestampFormat", "dd/MM/yyyy HH:mm") |
| .json(datesRecords) |
| timestampsWithFormat.write |
| .format("json") |
| .option("timestampFormat", "yyyy/MM/dd HH:mm") |
| .option(DateTimeUtils.TIMEZONE_OPTION, "UTC") |
| .save(timestampsWithFormatPath) |
| |
| // This will load back the timestamps as string. |
| val stringSchema = StructType(StructField("date", StringType, true) :: Nil) |
| val stringTimestampsWithFormat = spark.read |
| .schema(stringSchema) |
| .json(timestampsWithFormatPath) |
| val expectedStringDatesWithFormat = Seq( |
| Row("2015/08/27 01:00"), |
| Row("2014/10/28 01:30"), |
| Row("2016/01/29 04:00")) |
| |
| checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat) |
| |
| val readBack = spark.read |
| .schema(customSchema) |
| .option("timestampFormat", "yyyy/MM/dd HH:mm") |
| .option(DateTimeUtils.TIMEZONE_OPTION, "UTC") |
| .json(timestampsWithFormatPath) |
| |
| checkAnswer(readBack, timestampsWithFormat) |
| } |
| } |
| |
| test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { |
| val records = Seq("""{"a": 3, "b": 1.1}""", """{"a": 3.1, "b": 0.000001}""").toDS() |
| |
| val schema = StructType( |
| StructField("a", DecimalType(21, 1), true) :: |
| StructField("b", DecimalType(7, 6), true) :: Nil) |
| |
| val df1 = spark.read.option("prefersDecimal", "true").json(records) |
| assert(df1.schema == schema) |
| val df2 = spark.read.option("PREfersdecimaL", "true").json(records) |
| assert(df2.schema == schema) |
| } |
| |
| test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { |
| withTempPath { dir => |
| val path = dir.getCanonicalPath |
| primitiveFieldAndType |
| .toDF("value") |
| .write |
| .option("compression", "GzIp") |
| .text(path) |
| |
| assert(new File(path).listFiles().exists(_.getName.endsWith(".gz"))) |
| |
| val jsonDF = spark.read.option("multiLine", true).json(path) |
| val jsonDir = new File(dir, "json").getCanonicalPath |
| jsonDF.coalesce(1).write |
| .option("compression", "gZiP") |
| .json(jsonDir) |
| |
| assert(new File(jsonDir).listFiles().exists(_.getName.endsWith(".json.gz"))) |
| |
| val originalData = spark.read.json(primitiveFieldAndType) |
| checkAnswer(jsonDF, originalData) |
| checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), originalData) |
| } |
| } |
| |
| test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { |
| withTempPath { dir => |
| val path = dir.getCanonicalPath |
| primitiveFieldAndType |
| .toDF("value") |
| .write |
| .text(path) |
| |
| val jsonDF = spark.read.option("multiLine", true).json(path) |
| val jsonDir = new File(dir, "json").getCanonicalPath |
| jsonDF.coalesce(1).write.json(jsonDir) |
| |
| val compressedFiles = new File(jsonDir).listFiles() |
| assert(compressedFiles.exists(_.getName.endsWith(".json"))) |
| |
| val originalData = spark.read.json(primitiveFieldAndType) |
| checkAnswer(jsonDF, originalData) |
| checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), originalData) |
| } |
| } |
| |
| test("SPARK-18352: Expect one JSON document per file") { |
| // the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token. |
| // this might not be the optimal behavior but this test verifies that only the first value |
| // is parsed and the rest are discarded. |
| |
| // alternatively the parser could continue parsing following objects, which may further reduce |
| // allocations by skipping the line reader entirely |
| |
| withTempPath { dir => |
| val path = dir.getCanonicalPath |
| spark |
| .createDataFrame(Seq(Tuple1("{}{invalid}"))) |
| .coalesce(1) |
| .write |
| .text(path) |
| |
| val jsonDF = spark.read.option("multiLine", true).json(path) |
| // no corrupt record column should be created |
| assert(jsonDF.schema === StructType(Seq())) |
| // only the first object should be read |
| assert(jsonDF.count() === 1) |
| } |
| } |
| |
| test("SPARK-45035: json enable ignoreCorruptFiles/ignoreMissingFiles") { |
| withCorruptFile(inputFile => { |
| withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { |
| val e = intercept[SparkException] { |
| spark.read.json(inputFile.toURI.toString).collect() |
| } |
| checkError( |
| exception = e, |
| errorClass = "FAILED_READ_FILE.NO_HINT", |
| parameters = Map("path" -> inputFile.toPath.toUri.toString)) |
| assert(e.getCause.isInstanceOf[EOFException]) |
| assert(e.getCause.getMessage === "Unexpected end of input stream") |
| val e2 = intercept[SparkException] { |
| spark.read.option("multiLine", true).json(inputFile.toURI.toString).collect() |
| } |
| assert(e2.getCause.isInstanceOf[EOFException]) |
| assert(e2.getCause.getMessage === "Unexpected end of input stream") |
| } |
| withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { |
| assert(spark.read.json(inputFile.toURI.toString).collect().isEmpty) |
| assert(spark.read.option("multiLine", true).json(inputFile.toURI.toString).collect() |
| .isEmpty) |
| } |
| }) |
| withTempPath { dir => |
| val jsonPath = new Path(dir.getCanonicalPath, "json") |
| val fs = jsonPath.getFileSystem(spark.sessionState.newHadoopConf()) |
| |
| sampledTestData.write.json(jsonPath.toString) |
| val df = spark.read.option("multiLine", true).json(jsonPath.toString) |
| fs.delete(jsonPath, true) |
| withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") { |
| checkErrorMatchPVals( |
| exception = intercept[SparkException] { |
| df.collect() |
| }, |
| errorClass = "FAILED_READ_FILE.FILE_NOT_EXIST", |
| parameters = Map("path" -> s".*$dir.*") |
| ) |
| } |
| |
| sampledTestData.write.json(jsonPath.toString) |
| val df2 = spark.read.option("multiLine", true).json(jsonPath.toString) |
| fs.delete(jsonPath, true) |
| withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") { |
| assert(df2.collect().isEmpty) |
| } |
| } |
| } |
| |
| test("SPARK-18352: Handle multi-line corrupt documents (PERMISSIVE)") { |
| withTempPath { dir => |
| val path = dir.getCanonicalPath |
| val corruptRecordCount = additionalCorruptRecords.count().toInt |
| assert(corruptRecordCount === 5) |
| |
| additionalCorruptRecords |
| .toDF("value") |
| // this is the minimum partition count that avoids hash collisions |
| .repartition(corruptRecordCount * 4, F.hash($"value")) |
| .write |
| .text(path) |
| |
| val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path) |
| assert(jsonDF.count() === corruptRecordCount) |
| assert(jsonDF.schema === new StructType() |
| .add("_corrupt_record", StringType) |
| .add("dummy", StringType)) |
| val counts = jsonDF |
| .join( |
| additionalCorruptRecords.toDF("value"), |
| F.regexp_replace($"_corrupt_record", "(^\\s+|\\s+$)", "") === F.trim($"value"), |
| "outer") |
| .agg( |
| F.count($"dummy").as("valid"), |
| F.count($"_corrupt_record").as("corrupt"), |
| F.count("*").as("count")) |
| checkAnswer(counts, Row(1, 4, 6)) |
| } |
| } |
| |
| test("SPARK-19641: Handle multi-line corrupt documents (DROPMALFORMED)") { |
| withTempPath { dir => |
| val path = dir.getCanonicalPath |
| val corruptRecordCount = additionalCorruptRecords.count().toInt |
| assert(corruptRecordCount === 5) |
| |
| additionalCorruptRecords |
| .toDF("value") |
| // this is the minimum partition count that avoids hash collisions |
| .repartition(corruptRecordCount * 4, F.hash($"value")) |
| .write |
| .text(path) |
| |
| val jsonDF = spark.read.option("multiLine", true).option("mode", "DROPMALFORMED").json(path) |
| checkAnswer(jsonDF, Seq(Row("test"))) |
| } |
| } |
| |
| test("SPARK-18352: Handle multi-line corrupt documents (FAILFAST)") { |
| withTempPath { dir => |
| val path = dir.getCanonicalPath |
| val corruptRecordCount = additionalCorruptRecords.count().toInt |
| assert(corruptRecordCount === 5) |
| |
| additionalCorruptRecords |
| .toDF("value") |
| // this is the minimum partition count that avoids hash collisions |
| .repartition(corruptRecordCount * 4, F.hash($"value")) |
| .write |
| .text(path) |
| |
| val schema = new StructType().add("dummy", StringType) |
| |
| // `FAILFAST` mode should throw an exception for corrupt records. |
| checkErrorMatchPVals( |
| exception = intercept[SparkException] { |
| spark.read |
| .option("multiLine", true) |
| .option("mode", "FAILFAST") |
| .json(path) |
| }, |
| errorClass = "_LEGACY_ERROR_TEMP_2167", |
| parameters = Map("failFastMode" -> "FAILFAST", "dataType" -> "string|bigint")) |
| |
| val ex = intercept[SparkException] { |
| spark.read |
| .option("multiLine", true) |
| .option("mode", "FAILFAST") |
| .schema(schema) |
| .json(path) |
| .collect() |
| } |
| checkErrorMatchPVals( |
| exception = ex, |
| errorClass = "FAILED_READ_FILE.NO_HINT", |
| parameters = Map("path" -> s".*$path.*")) |
| checkError( |
| exception = ex.getCause.asInstanceOf[SparkException], |
| errorClass = "MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION", |
| parameters = Map( |
| "badRecord" -> "[null]", |
| "failFastMode" -> "FAILFAST") |
| ) |
| } |
| } |
| |
| test("Throw an exception if a `columnNameOfCorruptRecord` field violates requirements") { |
| val columnNameOfCorruptRecord = "_unparsed" |
| val schema = StructType( |
| StructField(columnNameOfCorruptRecord, IntegerType, true) :: |
| StructField("a", StringType, true) :: |
| StructField("b", StringType, true) :: |
| StructField("c", StringType, true) :: Nil) |
| |
| checkError( |
| exception = intercept[AnalysisException] { |
| spark.read |
| .option("mode", "Permissive") |
| .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) |
| .schema(schema) |
| .json(corruptRecords) |
| }, |
| errorClass = "_LEGACY_ERROR_TEMP_1097", |
| parameters = Map.empty |
| ) |
| |
| // We use `PERMISSIVE` mode by default if invalid string is given. |
| withTempPath { dir => |
| val path = dir.getCanonicalPath |
| corruptRecords.toDF("value").write.text(path) |
| checkError( |
| exception = intercept[AnalysisException] { |
| spark.read |
| .option("mode", "permm") |
| .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) |
| .schema(schema) |
| .json(path) |
| .collect() |
| }, |
| errorClass = "_LEGACY_ERROR_TEMP_1097", |
| parameters = Map.empty |
| ) |
| } |
| } |
| |
| test("SPARK-18772: Parse special floats correctly") { |
| val jsonFieldValues = Seq( |
| ("""{"a": "NaN"}""", """'NaN'"""), |
| ("""{"a": "+INF"}""", """'+INF'"""), |
| ("""{"a": "-INF"}""", """'-INF'"""), |
| ("""{"a": "Infinity"}""", """'Infinity'"""), |
| ("""{"a": "+Infinity"}""", """'+Infinity'"""), |
| ("""{"a": "-Infinity"}""", """'-Infinity'""")) |
| |
| // positive cases |
| val checks: Seq[Double => Boolean] = Seq( |
| _.isNaN, |
| _.isPosInfinity, |
| _.isNegInfinity, |
| _.isPosInfinity, |
| _.isPosInfinity, |
| _.isNegInfinity) |
| |
| Seq(FloatType, DoubleType).foreach { dt => |
| jsonFieldValues.zip(checks).foreach { case (jsonFieldValue, check) => |
| val ds = spark.read |
| .schema(StructType(Seq(StructField("a", dt)))) |
| .json(Seq(jsonFieldValue._1).toDS()) |
| .select($"a".cast(DoubleType)).as[Double] |
| assert(check(ds.first())) |
| } |
| } |
| |
| // negative cases |
| Seq(FloatType, DoubleType).foreach { dt => |
| val lowerCasedJsonFieldValues = jsonFieldValues.map(j => |
| (j._1.toLowerCase(Locale.ROOT), j._2.toLowerCase(Locale.ROOT))) |
| // The special floats are case-sensitive so these cases below throw exceptions. |
| lowerCasedJsonFieldValues.foreach { lowerCasedJsonFieldValue => |
| checkError( |
| exception = intercept[SparkRuntimeException] { |
| spark.read |
| .option("mode", "FAILFAST") |
| .schema(StructType(Seq(StructField("a", dt)))) |
| .json(Seq(lowerCasedJsonFieldValue._1).toDS()) |
| .collect() |
| }, |
| errorClass = "MALFORMED_RECORD_IN_PARSING.CANNOT_PARSE_STRING_AS_DATATYPE", |
| parameters = Map( |
| "failFastMode" -> "FAILFAST", |
| "badRecord" -> lowerCasedJsonFieldValue._1, |
| "fieldValue" -> lowerCasedJsonFieldValue._2, |
| "fieldName" -> "`a`", |
| "targetType" -> dt.toString, |
| "inputType" -> "StringType") |
| ) |
| } |
| } |
| } |
| |
| test("SPARK-21610: Corrupt records are not handled properly when creating a dataframe " + |
| "from a file") { |
| withTempPath { dir => |
| val path = dir.getCanonicalPath |
| val data = |
| """{"field": 1} |
| |{"field": 2} |
| |{"field": "3"}""".stripMargin |
| Seq(data).toDF().repartition(1).write.text(path) |
| val schema = new StructType().add("field", ByteType).add("_corrupt_record", StringType) |
| // negative cases |
| checkError( |
| exception = intercept[AnalysisException] { |
| spark.read.schema(schema).json(path).select("_corrupt_record").collect() |
| }, |
| errorClass = "_LEGACY_ERROR_TEMP_1285", |
| parameters = Map.empty |
| ) |
| |
| // workaround |
| val df = spark.read.schema(schema).json(path).cache() |
| assert(df.filter($"_corrupt_record".isNotNull).count() == 1) |
| assert(df.filter($"_corrupt_record".isNull).count() == 2) |
| checkAnswer( |
| df.select("_corrupt_record"), |
| Row(null) :: Row(null) :: Row("{\"field\": \"3\"}") :: Nil |
| ) |
| } |
| } |
| |
| def testLineSeparator(lineSep: String): Unit = { |
| test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") { |
| // Read |
| val data = |
| s""" |
| | {"f": |
| |"a", "f0": 1}$lineSep{"f": |
| | |
| |"c", "f0": 2}$lineSep{"f": "d", "f0": 3} |
| """.stripMargin |
| val dataWithTrailingLineSep = s"$data$lineSep" |
| |
| Seq(data, dataWithTrailingLineSep).foreach { lines => |
| withTempPath { path => |
| Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8)) |
| val df = spark.read.option("lineSep", lineSep).json(path.getAbsolutePath) |
| val expectedSchema = |
| StructType(StructField("f", StringType) :: StructField("f0", LongType) :: Nil) |
| checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF()) |
| assert(df.schema === expectedSchema) |
| } |
| } |
| |
| // Write |
| withTempPath { path => |
| Seq("a", "b", "c").toDF("value").coalesce(1) |
| .write.option("lineSep", lineSep).json(path.getAbsolutePath) |
| val partFile = TestUtils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head |
| val readBack = new String(Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8) |
| assert( |
| readBack === s"""{"value":"a"}$lineSep{"value":"b"}$lineSep{"value":"c"}$lineSep""") |
| } |
| |
| // Roundtrip |
| withTempPath { path => |
| val df = Seq("a", "b", "c").toDF() |
| df.write.option("lineSep", lineSep).json(path.getAbsolutePath) |
| val readBack = spark.read.option("lineSep", lineSep).json(path.getAbsolutePath) |
| checkAnswer(df, readBack) |
| } |
| } |
| } |
| |
| // scalastyle:off nonascii |
| Seq("|", "^", "::", "!!!@3", 0x1E.toChar.toString, "ì•„").foreach { lineSep => |
| testLineSeparator(lineSep) |
| } |
| // scalastyle:on nonascii |
| |
| test("""SPARK-21289: Support line separator - default value \r, \r\n and \n""") { |
| val data = |
| "{\"f\": \"a\", \"f0\": 1}\r{\"f\": \"c\", \"f0\": 2}\r\n{\"f\": \"d\", \"f0\": 3}\n" |
| |
| withTempPath { path => |
| Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8)) |
| val df = spark.read.json(path.getAbsolutePath) |
| val expectedSchema = |
| StructType(StructField("f", StringType) :: StructField("f0", LongType) :: Nil) |
| checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF()) |
| assert(df.schema === expectedSchema) |
| } |
| } |
| |
| test("SPARK-23849: schema inferring touches less data if samplingRatio < 1.0") { |
| // Set default values for the DataSource parameters to make sure |
| // that whole test file is mapped to only one partition. This will guarantee |
| // reliable sampling of the input file. |
| withSQLConf( |
| SQLConf.FILES_MAX_PARTITION_BYTES.key -> (128 * 1024 * 1024).toString, |
| SQLConf.FILES_OPEN_COST_IN_BYTES.key -> (4 * 1024 * 1024).toString |
| )(withTempPath { path => |
| val ds = sampledTestData.coalesce(1) |
| ds.write.text(path.getAbsolutePath) |
| val readback1 = spark.read.option("samplingRatio", 0.1).json(path.getCanonicalPath) |
| assert(readback1.schema == new StructType().add("f1", LongType)) |
| |
| withClue("SPARK-32621: 'path' option can cause issues while inferring schema") { |
| // During infer, "path" option gets added again to the paths that have already been listed. |
| // This results in reading more data than necessary and causes different schema to be |
| // inferred when sampling ratio is involved. |
| val readback2 = spark.read |
| .option("samplingRatio", 0.1).option("path", path.getCanonicalPath) |
| .format("json").load() |
| assert(readback2.schema == new StructType().add("f1", LongType)) |
| } |
| }) |
| } |
| |
| test("SPARK-23849: usage of samplingRatio while parsing a dataset of strings") { |
| val ds = sampledTestData.coalesce(1) |
| val readback = spark.read.option("samplingRatio", 0.1).json(ds) |
| |
| assert(readback.schema == new StructType().add("f1", LongType)) |
| } |
| |
| test("SPARK-23849: samplingRatio is out of the range (0, 1.0]") { |
| val ds = spark.range(0, 100, 1, 1).map(_.toString) |
| |
| val errorMsg0 = intercept[IllegalArgumentException] { |
| spark.read.option("samplingRatio", -1).json(ds) |
| }.getMessage |
| assert(errorMsg0.contains("samplingRatio (-1.0) should be greater than 0")) |
| |
| val errorMsg1 = intercept[IllegalArgumentException] { |
| spark.read.option("samplingRatio", 0).json(ds) |
| }.getMessage |
| assert(errorMsg1.contains("samplingRatio (0.0) should be greater than 0")) |
| |
| val sampled = spark.read.option("samplingRatio", 1.0).json(ds) |
| assert(sampled.count() == ds.count()) |
| } |
| |
| test("SPARK-23723: json in UTF-16 with BOM") { |
| val fileName = "test-data/utf16WithBOM.json" |
| val schema = new StructType().add("firstName", StringType).add("lastName", StringType) |
| val jsonDF = spark.read.schema(schema) |
| .option("multiline", "true") |
| .option("encoding", "UTF-16") |
| .json(testFile(fileName)) |
| |
| checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) |
| } |
| |
| test("SPARK-23723: multi-line json in UTF-32BE with BOM") { |
| val fileName = "test-data/utf32BEWithBOM.json" |
| val schema = new StructType().add("firstName", StringType).add("lastName", StringType) |
| val jsonDF = spark.read.schema(schema) |
| .option("multiline", "true") |
| .json(testFile(fileName)) |
| |
| checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) |
| } |
| |
| test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { |
| val fileName = "test-data/utf16LE.json" |
| val schema = new StructType().add("firstName", StringType).add("lastName", StringType) |
| val jsonDF = spark.read.schema(schema) |
| .option("multiline", "true") |
| .options(Map("encoding" -> "UTF-16LE")) |
| .json(testFile(fileName)) |
| |
| checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) |
| } |
| |
| test("SPARK-23723: Unsupported encoding name") { |
| val invalidCharset = "UTF-128" |
| val exception = intercept[UnsupportedCharsetException] { |
| spark.read |
| .options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) |
| .json(testFile("test-data/utf16LE.json")) |
| .count() |
| } |
| |
| assert(exception.getMessage.contains(invalidCharset)) |
| } |
| |
| test("SPARK-23723: checking that the encoding option is case agnostic") { |
| val fileName = "test-data/utf16LE.json" |
| val schema = new StructType().add("firstName", StringType).add("lastName", StringType) |
| val jsonDF = spark.read.schema(schema) |
| .option("multiline", "true") |
| .options(Map("encoding" -> "uTf-16lE")) |
| .json(testFile(fileName)) |
| |
| checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) |
| } |
| |
| test("SPARK-23723: specified encoding is not matched to actual encoding") { |
| val fileName = "test-data/utf16LE.json" |
| val schema = new StructType().add("firstName", StringType).add("lastName", StringType) |
| val inputFile = testFile(fileName) |
| val exception = intercept[SparkException] { |
| spark.read.schema(schema) |
| .option("mode", "FAILFAST") |
| .option("multiline", "true") |
| .options(Map("encoding" -> "UTF-16BE")) |
| .json(inputFile) |
| .count() |
| } |
| checkErrorMatchPVals( |
| exception = exception, |
| errorClass = "FAILED_READ_FILE.NO_HINT", |
| parameters = Map("path" -> s".*$fileName.*")) |
| checkError( |
| exception = exception.getCause.asInstanceOf[SparkException], |
| errorClass = "MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION", |
| parameters = Map("badRecord" -> "[empty row]", "failFastMode" -> "FAILFAST") |
| ) |
| } |
| |
| def checkEncoding(expectedEncoding: String, pathToJsonFiles: String, |
| expectedContent: String): Unit = { |
| val jsonFiles = new File(pathToJsonFiles) |
| .listFiles() |
| .filter(_.isFile) |
| .filter(_.getName.endsWith("json")) |
| val actualContent = jsonFiles.map { file => |
| new String(Files.readAllBytes(file.toPath), expectedEncoding) |
| }.mkString.trim |
| |
| assert(actualContent == expectedContent) |
| } |
| |
| test("SPARK-23723: save json in UTF-32BE") { |
| val encoding = "UTF-32BE" |
| withTempPath { path => |
| val df = spark.createDataset(Seq(("Dog", 42))) |
| df.write |
| .options(Map("encoding" -> encoding)) |
| .json(path.getCanonicalPath) |
| |
| checkEncoding( |
| expectedEncoding = encoding, |
| pathToJsonFiles = path.getCanonicalPath, |
| expectedContent = """{"_1":"Dog","_2":42}""") |
| } |
| } |
| |
| test("SPARK-23723: save json in default encoding - UTF-8") { |
| withTempPath { path => |
| val df = spark.createDataset(Seq(("Dog", 42))) |
| df.write.json(path.getCanonicalPath) |
| |
| checkEncoding( |
| expectedEncoding = "UTF-8", |
| pathToJsonFiles = path.getCanonicalPath, |
| expectedContent = """{"_1":"Dog","_2":42}""") |
| } |
| } |
| |
| test("SPARK-23723: wrong output encoding") { |
| val encoding = "UTF-128" |
| val exception = intercept[SparkException] { |
| withTempPath { path => |
| val df = spark.createDataset(Seq((0))) |
| df.write |
| .options(Map("encoding" -> encoding)) |
| .json(path.getCanonicalPath) |
| } |
| } |
| |
| val baos = new ByteArrayOutputStream() |
| val ps = new PrintStream(baos, true, StandardCharsets.UTF_8.name()) |
| exception.printStackTrace(ps) |
| ps.flush() |
| |
| assert(baos.toString.contains( |
| "java.nio.charset.UnsupportedCharsetException: UTF-128")) |
| } |
| |
| test("SPARK-23723: read back json in UTF-16LE") { |
| val options = Map("encoding" -> "UTF-16LE", "lineSep" -> "\n") |
| withTempPath { path => |
| val ds = spark.createDataset(Seq(("a", 1), ("b", 2), ("c", 3))).repartition(2) |
| ds.write.options(options).json(path.getCanonicalPath) |
| |
| val readBack = spark |
| .read |
| .options(options) |
| .json(path.getCanonicalPath) |
| |
| checkAnswer(readBack.toDF(), ds.toDF()) |
| } |
| } |
| |
| test("SPARK-23723: write json in UTF-16/32 with multiline off") { |
| Seq("UTF-16", "UTF-32").foreach { encoding => |
| withTempPath { path => |
| val ds = spark.createDataset(Seq(("a", 1))).repartition(1) |
| ds.write |
| .option("encoding", encoding) |
| .option("multiline", false) |
| .json(path.getCanonicalPath) |
| val jsonFiles = path.listFiles().filter(_.getName.endsWith("json")) |
| jsonFiles.foreach { jsonFile => |
| val readback = Files.readAllBytes(jsonFile.toPath) |
| val expected = ("""{"_1":"a","_2":1}""" + "\n").getBytes(Charset.forName(encoding)) |
| assert(readback === expected) |
| } |
| } |
| } |
| } |
| |
| def checkReadJson(lineSep: String, encoding: String, inferSchema: Boolean, id: Int): Unit = { |
| test(s"SPARK-23724: checks reading json in ${encoding} #${id}") { |
| val schema = new StructType().add("f1", StringType).add("f2", IntegerType) |
| withTempPath { path => |
| val records = List(("a", 1), ("b", 2)) |
| val data = records |
| .map(rec => s"""{"f1":"${rec._1}", "f2":${rec._2}}""".getBytes(encoding)) |
| .reduce((a1, a2) => a1 ++ lineSep.getBytes(encoding) ++ a2) |
| val os = new FileOutputStream(path) |
| os.write(data) |
| os.close() |
| val reader = if (inferSchema) { |
| spark.read |
| } else { |
| spark.read.schema(schema) |
| } |
| val readBack = reader |
| .option("encoding", encoding) |
| .option("lineSep", lineSep) |
| .json(path.getCanonicalPath) |
| checkAnswer(readBack, records.map(rec => Row(rec._1, rec._2))) |
| } |
| } |
| } |
| |
| // scalastyle:off nonascii |
| List( |
| (0, "|", "UTF-8", false), |
| (1, "^", "UTF-16BE", true), |
| (2, "::", "ISO-8859-1", true), |
| (3, "!!!@3", "UTF-32LE", false), |
| (4, 0x1E.toChar.toString, "UTF-8", true), |
| (5, "ì•„", "UTF-32BE", false), |
| (6, "куку", "CP1251", true), |
| (7, "sep", "utf-8", false), |
| (8, "\r\n", "UTF-16LE", false), |
| (9, "\r\n", "utf-16be", true), |
| (10, "\u000d\u000a", "UTF-32BE", false), |
| (11, "\u000a\u000d", "UTF-8", true), |
| (12, "===", "US-ASCII", false), |
| (13, "$^+", "utf-32le", true) |
| ).foreach { |
| case (testNum, sep, encoding, inferSchema) => checkReadJson(sep, encoding, inferSchema, testNum) |
| } |
| // scalastyle:on nonascii |
| |
| test("SPARK-23724: lineSep should be set if encoding if different from UTF-8") { |
| val encoding = "UTF-16LE" |
| val exception = intercept[IllegalArgumentException] { |
| spark.read |
| .options(Map("encoding" -> encoding)) |
| .json(testFile("test-data/utf16LE.json")) |
| .count() |
| } |
| |
| assert(exception.getMessage.contains( |
| s"""The lineSep option must be specified for the $encoding encoding""")) |
| } |
| |
| private val badJson = "\u0000\u0000\u0000A\u0001AAA" |
| |
| test("SPARK-23094: permissively read JSON file with leading nulls when multiLine is enabled") { |
| withTempPath { tempDir => |
| val path = tempDir.getAbsolutePath |
| Seq(badJson + """{"a":1}""").toDS().write.text(path) |
| val expected = s"""${badJson}{"a":1}\n""" |
| val schema = new StructType().add("a", IntegerType).add("_corrupt_record", StringType) |
| val df = spark.read.format("json") |
| .option("mode", "PERMISSIVE") |
| .option("multiLine", true) |
| .option("encoding", "UTF-8") |
| .schema(schema).load(path) |
| checkAnswer(df, Row(null, expected)) |
| } |
| } |
| |
| test("SPARK-23094: permissively read JSON file with leading nulls when multiLine is disabled") { |
| withTempPath { tempDir => |
| val path = tempDir.getAbsolutePath |
| Seq(badJson, """{"a":1}""").toDS().write.text(path) |
| val schema = new StructType().add("a", IntegerType).add("_corrupt_record", StringType) |
| val df = spark.read.format("json") |
| .option("mode", "PERMISSIVE") |
| .option("multiLine", false) |
| .option("encoding", "UTF-8") |
| .schema(schema).load(path) |
| checkAnswer(df, Seq(Row(1, null), Row(null, badJson))) |
| } |
| } |
| |
| test("SPARK-23094: permissively parse a dataset contains JSON with leading nulls") { |
| checkAnswer( |
| spark.read.option("mode", "PERMISSIVE").option("encoding", "UTF-8").json(Seq(badJson).toDS()), |
| Row(badJson)) |
| checkAnswer( |
| // encoding auto detection should also be possible with json schema infer |
| spark.read.option("mode", "PERMISSIVE").json(Seq(badJson).toDS()), |
| Row(badJson)) |
| } |
| |
| test("SPARK-37176: inferring should be possible when parse mode is permissive") { |
| withTempPath { tempDir => |
| val path = tempDir.getAbsolutePath |
| // normal input to let spark correctly infer schema |
| val record = """{"a":1}""" |
| Seq(record, badJson + record).toDS().write.text(path) |
| val expected = s"""${badJson}{"a":1}""" |
| val df = spark.read.format("json") |
| .option("mode", "PERMISSIVE") |
| .load(path) |
| checkAnswer(df, Seq(Row(null, 1), Row(expected, null))) |
| } |
| } |
| |
| test("SPARK-31716: inferring should handle malformed input") { |
| val schema = new StructType().add("a", IntegerType) |
| val dfWithSchema = spark.read.format("json") |
| .option("mode", "DROPMALFORMED") |
| .option("encoding", "utf-8") |
| .schema(schema) |
| .load(testFile("test-data/malformed_utf8.json")) |
| checkAnswer(dfWithSchema, Seq(Row(1), Row(1))) |
| |
| val df = spark.read.format("json") |
| .option("mode", "DROPMALFORMED") |
| .option("encoding", "utf-8") |
| .load(testFile("test-data/malformed_utf8.json")) |
| |
| checkAnswer(df, Seq(Row(1), Row(1))) |
| } |
| |
| test("SPARK-23772 ignore column of all null values or empty array during schema inference") { |
| withTempPath { tempDir => |
| val path = tempDir.getAbsolutePath |
| |
| // primitive types |
| Seq( |
| """{"a":null, "b":1, "c":3.0}""", |
| """{"a":null, "b":null, "c":"string"}""", |
| """{"a":null, "b":null, "c":null}""") |
| .toDS().write.text(path) |
| var df = spark.read.format("json") |
| .option("dropFieldIfAllNull", true) |
| .load(path) |
| var expectedSchema = new StructType() |
| .add("b", LongType).add("c", StringType) |
| assert(df.schema === expectedSchema) |
| checkAnswer(df, Row(1, "3.0") :: Row(null, "string") :: Row(null, null) :: Nil) |
| |
| // arrays |
| Seq( |
| """{"a":[2, 1], "b":[null, null], "c":null, "d":[[], [null]], "e":[[], null, [[]]]}""", |
| """{"a":[null], "b":[null], "c":[], "d":[null, []], "e":null}""", |
| """{"a":null, "b":null, "c":[], "d":null, "e":[null, [], null]}""") |
| .toDS().write.mode("overwrite").text(path) |
| df = spark.read.format("json") |
| .option("dropFieldIfAllNull", true) |
| .load(path) |
| expectedSchema = new StructType() |
| .add("a", ArrayType(LongType)) |
| assert(df.schema === expectedSchema) |
| checkAnswer(df, Row(Array(2, 1)) :: Row(Array(null)) :: Row(null) :: Nil) |
| |
| // structs |
| Seq( |
| """{"a":{"a1": 1, "a2":"string"}, "b":{}}""", |
| """{"a":{"a1": 2, "a2":null}, "b":{"b1":[null]}}""", |
| """{"a":null, "b":null}""") |
| .toDS().write.mode("overwrite").text(path) |
| df = spark.read.format("json") |
| .option("dropFieldIfAllNull", true) |
| .load(path) |
| expectedSchema = new StructType() |
| .add("a", StructType(StructField("a1", LongType) :: StructField("a2", StringType) |
| :: Nil)) |
| assert(df.schema === expectedSchema) |
| checkAnswer(df, Row(Row(1, "string")) :: Row(Row(2, null)) :: Row(null) :: Nil) |
| } |
| } |
| |
| test("SPARK-24190: restrictions for JSONOptions in read") { |
| for (encoding <- Set("UTF-16", "UTF-32")) { |
| val exception = intercept[IllegalArgumentException] { |
| spark.read |
| .option("encoding", encoding) |
| .option("multiLine", false) |
| .json(testFile("test-data/utf16LE.json")) |
| .count() |
| } |
| assert(exception.getMessage.contains("encoding must not be included in the denyList")) |
| } |
| } |
| |
| test("count() for malformed input") { |
| def countForMalformedJSON(expected: Long, input: Seq[String]): Unit = { |
| val schema = new StructType().add("a", StringType) |
| val strings = spark.createDataset(input) |
| val df = spark.read.schema(schema).json(strings) |
| |
| assert(df.count() == expected) |
| } |
| def checkCount(expected: Long): Unit = { |
| val validRec = """{"a":"b"}""" |
| val inputs = Seq( |
| Seq("{-}", validRec), |
| Seq(validRec, "?"), |
| Seq("}", validRec), |
| Seq(validRec, """{"a": [1, 2, 3]}"""), |
| Seq("""{"a": {"a": "b"}}""", validRec) |
| ) |
| inputs.foreach { input => |
| countForMalformedJSON(expected, input) |
| } |
| } |
| |
| checkCount(2) |
| countForMalformedJSON(0, Seq("")) |
| } |
| |
| test("SPARK-26745: count() for non-multiline input with empty lines") { |
| withTempPath { tempPath => |
| val path = tempPath.getCanonicalPath |
| Seq("""{ "a" : 1 }""", "", """ { "a" : 2 }""", " \t ") |
| .toDS() |
| .repartition(1) |
| .write |
| .text(path) |
| assert(spark.read.json(path).count() === 2) |
| } |
| } |
| |
| |
| private def failedOnEmptyString(dataType: DataType): Unit = { |
| val df = spark.read.schema(s"a ${dataType.catalogString}") |
| .option("mode", "FAILFAST").json(Seq("""{"a":""}""").toDS()) |
| val e = intercept[SparkException] { df.collect() } |
| checkError( |
| exception = e, |
| errorClass = "MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION", |
| parameters = Map("badRecord" -> "[null]", "failFastMode" -> "FAILFAST")) |
| checkError( |
| exception = e.getCause.asInstanceOf[SparkRuntimeException], |
| errorClass = "EMPTY_JSON_FIELD_VALUE", |
| parameters = Map("dataType" -> toSQLType(dataType)) |
| ) |
| } |
| |
| private def emptyString(dataType: DataType, expected: Any): Unit = { |
| val df = spark.read.schema(s"a ${dataType.catalogString}") |
| .option("mode", "FAILFAST").json(Seq("""{"a":""}""").toDS()) |
| checkAnswer(df, Row(expected) :: Nil) |
| } |
| |
| test("SPARK-25040: empty strings should be disallowed") { |
| failedOnEmptyString(BooleanType) |
| failedOnEmptyString(ByteType) |
| failedOnEmptyString(ShortType) |
| failedOnEmptyString(IntegerType) |
| failedOnEmptyString(LongType) |
| failedOnEmptyString(FloatType) |
| failedOnEmptyString(DoubleType) |
| failedOnEmptyString(DecimalType.SYSTEM_DEFAULT) |
| failedOnEmptyString(TimestampType) |
| failedOnEmptyString(DateType) |
| failedOnEmptyString(ArrayType(IntegerType)) |
| failedOnEmptyString(MapType(StringType, IntegerType, true)) |
| failedOnEmptyString(StructType(StructField("f1", IntegerType, true) :: Nil)) |
| |
| emptyString(StringType, "") |
| emptyString(BinaryType, "".getBytes(StandardCharsets.UTF_8)) |
| } |
| |
| test("SPARK-25040: allowing empty strings when legacy config is enabled") { |
| def emptyStringAsNull(dataType: DataType): Unit = { |
| val df = spark.read.schema(s"a ${dataType.catalogString}") |
| .option("mode", "FAILFAST").json(Seq("""{"a":""}""").toDS()) |
| checkAnswer(df, Row(null) :: Nil) |
| } |
| |
| // Legacy mode prior to Spark 3.0.0 |
| withSQLConf(SQLConf.LEGACY_ALLOW_EMPTY_STRING_IN_JSON.key -> "true") { |
| emptyStringAsNull(BooleanType) |
| emptyStringAsNull(ByteType) |
| emptyStringAsNull(ShortType) |
| emptyStringAsNull(IntegerType) |
| emptyStringAsNull(LongType) |
| |
| failedOnEmptyString(FloatType) |
| failedOnEmptyString(DoubleType) |
| failedOnEmptyString(TimestampType) |
| failedOnEmptyString(DateType) |
| |
| emptyStringAsNull(DecimalType.SYSTEM_DEFAULT) |
| emptyStringAsNull(ArrayType(IntegerType)) |
| emptyStringAsNull(MapType(StringType, IntegerType, true)) |
| emptyStringAsNull(StructType(StructField("f1", IntegerType, true) :: Nil)) |
| |
| emptyString(StringType, "") |
| emptyString(BinaryType, "".getBytes(StandardCharsets.UTF_8)) |
| } |
| } |
| |
| test("return partial result for bad records") { |
| val schema = "a double, b array<int>, c string, _corrupt_record string" |
| val badRecords = Seq( |
| """{"a":"-","b":[0, 1, 2],"c":"abc"}""", |
| """{"a":0.1,"b":{},"c":"def"}""").toDS() |
| val df = spark.read.schema(schema).json(badRecords) |
| |
| checkAnswer( |
| df, |
| Row(null, Array(0, 1, 2), "abc", """{"a":"-","b":[0, 1, 2],"c":"abc"}""") :: |
| Row(0.1, null, "def", """{"a":0.1,"b":{},"c":"def"}""") :: Nil) |
| } |
| |
| test("inferring timestamp type") { |
| def schemaOf(jsons: String*): StructType = { |
| spark.read.option("inferTimestamp", true).json(jsons.toDS()).schema |
| } |
| |
| assert(schemaOf( |
| """{"a":"2018-12-17T10:11:12.123-01:00"}""", |
| """{"a":"2018-12-16T22:23:24.123-02:00"}""") === fromDDL("a timestamp")) |
| |
| assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":1}""") |
| === fromDDL("a string")) |
| assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":"123"}""") |
| === fromDDL("a string")) |
| |
| assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":null}""") |
| === fromDDL("a timestamp")) |
| assert(schemaOf("""{"a":null}""", """{"a":"2018-12-17T10:11:12.123-01:00"}""") |
| === fromDDL("a timestamp")) |
| } |
| |
| test("roundtrip for timestamp type inferring") { |
| val customSchema = new StructType().add("date", TimestampType) |
| withTempDir { dir => |
| val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json" |
| val timestampsWithFormat = spark.read |
| .option("timestampFormat", "dd/MM/yyyy HH:mm") |
| .option("inferTimestamp", true) |
| .json(datesRecords) |
| assert(timestampsWithFormat.schema === customSchema) |
| |
| timestampsWithFormat.write |
| .format("json") |
| .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") |
| .option(DateTimeUtils.TIMEZONE_OPTION, "UTC") |
| .save(timestampsWithFormatPath) |
| |
| val readBack = spark.read |
| .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") |
| .option(DateTimeUtils.TIMEZONE_OPTION, "UTC") |
| .option("inferTimestamp", true) |
| .json(timestampsWithFormatPath) |
| |
| assert(readBack.schema === customSchema) |
| checkAnswer(readBack, timestampsWithFormat) |
| } |
| } |
| |
| test("SPARK-30960, SPARK-31641: parse date/timestamp string with legacy format") { |
| val ds = Seq( |
| s"{'t': '2020-1-12 3:23:34.12', 'd': '2020-1-12 T'}" |
| ).toDS() |
| val json = spark.read.schema("t timestamp, d date").json(ds) |
| checkAnswer(json, Row( |
| Timestamp.valueOf("2020-1-12 3:23:34.12"), |
| Date.valueOf("2020-1-12"))) |
| } |
| |
| test("exception mode for parsing date/timestamp string") { |
| val ds = Seq("{'t': '2020-01-27T20:06:11.847-08000'}").toDS() |
| val json = spark.read |
| .schema("t timestamp") |
| .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSz") |
| .json(ds) |
| withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") { |
| checkError( |
| exception = intercept[SparkUpgradeException] { |
| json.collect() |
| }, |
| errorClass = "INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER", |
| parameters = Map( |
| "datetime" -> "'2020-01-27T20:06:11.847-08000'", |
| "config" -> "\"spark.sql.legacy.timeParserPolicy\"")) |
| } |
| withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") { |
| checkAnswer(json, Row(Timestamp.valueOf("2020-01-27 20:06:11.847"))) |
| } |
| withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") { |
| checkAnswer(json, Row(null)) |
| } |
| } |
| |
| test("SPARK-37360: Write and infer TIMESTAMP_NTZ values with a non-default pattern") { |
| withTempPath { path => |
| val exp = spark.sql(""" |
| select timestamp_ntz'2020-12-12 12:12:12' as col0 union all |
| select timestamp_ntz'2020-12-12 12:12:12.123456' as col0 |
| """) |
| exp.write |
| .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") |
| .json(path.getAbsolutePath) |
| |
| withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) { |
| val res = spark.read |
| .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") |
| .option("inferTimestamp", "true") |
| .json(path.getAbsolutePath) |
| |
| assert(res.dtypes === exp.dtypes) |
| checkAnswer(res, exp) |
| } |
| } |
| } |
| |
| test("SPARK-37360: Write and infer TIMESTAMP_LTZ values with a non-default pattern") { |
| withTempPath { path => |
| val exp = spark.sql(""" |
| select timestamp_ltz'2020-12-12 12:12:12' as col0 union all |
| select timestamp_ltz'2020-12-12 12:12:12.123456' as col0 |
| """) |
| exp.write |
| .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") |
| .json(path.getAbsolutePath) |
| |
| withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) { |
| val res = spark.read |
| .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") |
| .option("inferTimestamp", "true") |
| .json(path.getAbsolutePath) |
| |
| assert(res.dtypes === exp.dtypes) |
| checkAnswer(res, exp) |
| } |
| } |
| } |
| |
| test("SPARK-37360: Roundtrip in reading and writing TIMESTAMP_NTZ values with custom schema") { |
| withTempPath { path => |
| val exp = spark.sql(""" |
| select |
| timestamp_ntz'2020-12-12 12:12:12' as col1, |
| timestamp_ltz'2020-12-12 12:12:12' as col2 |
| """) |
| |
| exp.write.json(path.getAbsolutePath) |
| |
| val res = spark.read |
| .schema("col1 TIMESTAMP_NTZ, col2 TIMESTAMP_LTZ") |
| .json(path.getAbsolutePath) |
| |
| checkAnswer(res, exp) |
| } |
| } |
| |
| test("SPARK-37360: Timestamp type inference for a column with TIMESTAMP_NTZ values") { |
| withTempPath { path => |
| val exp = spark.sql(""" |
| select timestamp_ntz'2020-12-12 12:12:12' as col0 union all |
| select timestamp_ntz'2020-12-12 12:12:12' as col0 |
| """) |
| |
| exp.write.json(path.getAbsolutePath) |
| |
| val timestampTypes = Seq( |
| SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString, |
| SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) |
| |
| timestampTypes.foreach { timestampType => |
| withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) { |
| val res = spark.read.option("inferTimestamp", "true").json(path.getAbsolutePath) |
| |
| if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) { |
| checkAnswer(res, exp) |
| } else { |
| checkAnswer( |
| res, |
| spark.sql(""" |
| select timestamp_ltz'2020-12-12 12:12:12' as col0 union all |
| select timestamp_ltz'2020-12-12 12:12:12' as col0 |
| """) |
| ) |
| } |
| } |
| } |
| } |
| } |
| |
| test("SPARK-37360: Timestamp type inference for a mix of TIMESTAMP_NTZ and TIMESTAMP_LTZ") { |
| withTempPath { path => |
| Seq( |
| """{"col0":"2020-12-12T12:12:12.000"}""", |
| """{"col0":"2020-12-12T17:12:12.000Z"}""", |
| """{"col0":"2020-12-12T17:12:12.000+05:00"}""", |
| """{"col0":"2020-12-12T12:12:12.000"}""" |
| ).toDF("data") |
| .coalesce(1) |
| .write.text(path.getAbsolutePath) |
| |
| for (policy <- Seq("exception", "corrected", "legacy")) { |
| withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> policy) { |
| val res = spark.read.option("inferTimestamp", "true").json(path.getAbsolutePath) |
| |
| // NOTE: |
| // Every value is tested for all types in JSON schema inference so the sequence of |
| // ["timestamp_ntz", "timestamp_ltz", "timestamp_ntz"] results in "timestamp_ltz". |
| // This is different from CSV where inference starts from the last inferred type. |
| // |
| // This is why the similar test in CSV has a different result in "legacy" mode. |
| |
| val exp = spark.sql(""" |
| select timestamp_ltz'2020-12-12T12:12:12.000' as col0 union all |
| select timestamp_ltz'2020-12-12T17:12:12.000Z' as col0 union all |
| select timestamp_ltz'2020-12-12T17:12:12.000+05:00' as col0 union all |
| select timestamp_ltz'2020-12-12T12:12:12.000' as col0 |
| """) |
| checkAnswer(res, exp) |
| } |
| } |
| } |
| } |
| |
| test("SPARK-37360: Malformed records when reading TIMESTAMP_LTZ as TIMESTAMP_NTZ") { |
| withTempPath { path => |
| Seq( |
| """{"col0": "2020-12-12T12:12:12.000"}""", |
| """{"col0": "2020-12-12T12:12:12.000Z"}""", |
| """{"col0": "2020-12-12T12:12:12.000+05:00"}""", |
| """{"col0": "2020-12-12T12:12:12.000"}""" |
| ).toDF("data") |
| .coalesce(1) |
| .write.text(path.getAbsolutePath) |
| |
| for (timestampNTZFormat <- Seq(None, Some("yyyy-MM-dd'T'HH:mm:ss[.SSS]"))) { |
| val reader = spark.read.schema("col0 TIMESTAMP_NTZ") |
| val res = timestampNTZFormat match { |
| case Some(format) => |
| reader.option("timestampNTZFormat", format).json(path.getAbsolutePath) |
| case None => |
| reader.json(path.getAbsolutePath) |
| } |
| |
| checkAnswer( |
| res, |
| Seq( |
| Row(LocalDateTime.of(2020, 12, 12, 12, 12, 12)), |
| Row(null), |
| Row(null), |
| Row(LocalDateTime.of(2020, 12, 12, 12, 12, 12)) |
| ) |
| ) |
| } |
| } |
| } |
| |
| test("SPARK-37360: Fail to write TIMESTAMP_NTZ if timestampNTZFormat contains zone offset") { |
| val patterns = Seq( |
| "yyyy-MM-dd HH:mm:ss XXX", |
| "yyyy-MM-dd HH:mm:ss Z", |
| "yyyy-MM-dd HH:mm:ss z") |
| |
| val exp = spark.sql("select timestamp_ntz'2020-12-12 12:12:12' as col0") |
| for (pattern <- patterns) { |
| withTempPath { path => |
| val actualPath = path.toPath.toUri.toURL.toString |
| val err = intercept[SparkException] { |
| exp.write.option("timestampNTZFormat", pattern).json(path.getAbsolutePath) |
| } |
| checkErrorMatchPVals( |
| exception = err, |
| errorClass = "TASK_WRITE_FAILED", |
| parameters = Map("path" -> s"$actualPath.*")) |
| |
| val msg = err.getCause.getMessage |
| assert( |
| msg.contains("Unsupported field: OffsetSeconds") || |
| msg.contains("Unable to extract value") || |
| msg.contains("Unable to extract ZoneId")) |
| } |
| } |
| } |
| |
| test("filters push down") { |
| withTempPath { path => |
| val t = "2019-12-17 00:01:02" |
| Seq( |
| """{"c0": "abc", "c1": {"c2": 1, "c3": "2019-11-14 20:35:30"}}""", |
| s"""{"c0": "def", "c1": {"c2": 2, "c3": "$t"}}""", |
| s"""{"c0": "defa", "c1": {"c2": 3, "c3": "$t"}}""", |
| s"""{"c0": "define", "c1": {"c2": 2, "c3": "$t"}}""").toDF("data") |
| .repartition(1) |
| .write.text(path.getAbsolutePath) |
| Seq(true, false).foreach { filterPushdown => |
| withSQLConf(SQLConf.JSON_FILTER_PUSHDOWN_ENABLED.key -> filterPushdown.toString) { |
| Seq("PERMISSIVE", "DROPMALFORMED", "FAILFAST").foreach { mode => |
| val readback = spark.read |
| .option("mode", mode) |
| .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") |
| .schema("c0 string, c1 struct<c2:integer,c3:timestamp>") |
| .json(path.getAbsolutePath) |
| .where($"c1.c2" === 2 && $"c0".startsWith("def")) |
| .select($"c1.c3") |
| assert(readback.count() === 2) |
| checkAnswer(readback, Seq(Row(Timestamp.valueOf(t)), Row(Timestamp.valueOf(t)))) |
| } |
| } |
| } |
| } |
| } |
| |
| test("apply filters to malformed rows") { |
| withSQLConf(SQLConf.JSON_FILTER_PUSHDOWN_ENABLED.key -> "true") { |
| withTempPath { path => |
| Seq( |
| "{}", |
| """{"invalid": 0}""", |
| """{"i":}""", |
| """{"i": 0}""", |
| """{"i": 1, "t": "2020-01-28 01:00:00"}""", |
| """{"t": "2020-01-28 02:00:00"}""", |
| """{"i": "abc", "t": "2020-01-28 03:00:00"}""", |
| """{"i": 2, "t": "2020-01-28 04:00:00", "d": 3.14}""").toDF("data") |
| .repartition(1) |
| .write.text(path.getAbsolutePath) |
| val schema = "i INTEGER, t TIMESTAMP" |
| val readback = spark.read |
| .schema(schema) |
| .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") |
| .json(path.getAbsolutePath) |
| // readback: |
| // +----+-------------------+ |
| // |i |t | |
| // +----+-------------------+ |
| // |null|null | |
| // |null|null | |
| // |null|null | |
| // |0 |null | |
| // |1 |2020-01-28 01:00:00| |
| // |null|2020-01-28 02:00:00| |
| // |null|2020-01-28 03:00:00| |
| // |2 |2020-01-28 04:00:00| |
| // +----+-------------------+ |
| checkAnswer( |
| readback.where($"i".isNull && $"t".isNotNull), |
| Seq( |
| Row(null, Timestamp.valueOf("2020-01-28 02:00:00")), |
| Row(null, Timestamp.valueOf("2020-01-28 03:00:00")))) |
| checkAnswer( |
| readback.where($"i" >= 0 && $"t" > "2020-01-28 00:00:00"), |
| Seq( |
| Row(1, Timestamp.valueOf("2020-01-28 01:00:00")), |
| Row(2, Timestamp.valueOf("2020-01-28 04:00:00")))) |
| checkAnswer( |
| readback.where($"t".isNull).select($"i"), |
| Seq(Row(null), Row(null), Row(null), Row(0))) |
| } |
| } |
| } |
| |
| test("case sensitivity of filters references") { |
| Seq(true, false).foreach { filterPushdown => |
| withSQLConf(SQLConf.JSON_FILTER_PUSHDOWN_ENABLED.key -> filterPushdown.toString) { |
| withTempPath { path => |
| Seq( |
| """{"aaa": 0, "BBB": 1}""", |
| """{"AAA": 2, "bbb": 3}""").toDF().write.text(path.getCanonicalPath) |
| withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { |
| val readback = spark.read.schema("aaa integer, BBB integer") |
| .json(path.getCanonicalPath) |
| checkAnswer(readback, Seq(Row(null, null), Row(0, 1))) |
| checkAnswer(readback.filter($"AAA" === 0 && $"bbb" === 1), Seq(Row(0, 1))) |
| checkAnswer(readback.filter($"AAA" === 2 && $"bbb" === 3), Seq()) |
| // Schema inferring |
| checkError( |
| exception = intercept[AnalysisException] { |
| spark.read.json(path.getCanonicalPath).collect() |
| }, |
| errorClass = "COLUMN_ALREADY_EXISTS", |
| parameters = Map("columnName" -> "`aaa`")) |
| } |
| withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { |
| val readback = spark.read.schema("aaa integer, BBB integer") |
| .json(path.getCanonicalPath) |
| checkAnswer(readback, Seq(Row(null, null), Row(0, 1))) |
| checkError( |
| exception = intercept[AnalysisException] { |
| readback.filter($"AAA" === 0 && $"bbb" === 1).collect() |
| }, |
| errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION", |
| parameters = Map("objectName" -> "`AAA`", "proposal" -> "`BBB`, `aaa`"), |
| context = |
| ExpectedContext(fragment = "$", callSitePattern = getCurrentClassCallSitePattern)) |
| // Schema inferring |
| val readback2 = spark.read.json(path.getCanonicalPath) |
| checkAnswer( |
| readback2.filter($"AAA" === 2).select($"AAA", $"bbb"), |
| Seq(Row(2, 3))) |
| checkAnswer(readback2.filter($"aaa" === 2).select($"AAA", $"bbb"), Seq()) |
| } |
| } |
| } |
| } |
| } |
| |
| test("SPARK-32810: JSON data source should be able to read files with " + |
| "escaped glob metacharacter in the paths") { |
| withTempDir { dir => |
| val basePath = dir.getCanonicalPath |
| // test JSON writer / reader without specifying schema |
| val jsonTableName = "{def}" |
| spark.range(3).coalesce(1).write.json(s"$basePath/$jsonTableName") |
| val readback = spark.read |
| .json(s"$basePath/${"""(\[|\]|\{|\})""".r.replaceAllIn(jsonTableName, """\\$1""")}") |
| assert(readback.collect() sameElements Array(Row(0), Row(1), Row(2))) |
| } |
| } |
| |
| test("SPARK-35047: Write Non-ASCII character as codepoint") { |
| // scalastyle:off nonascii |
| withTempPaths(2) { paths => |
| paths.foreach(_.delete()) |
| val seq = Seq("a", "\n", "\u3042") |
| val df = seq.toDF() |
| |
| val basePath1 = paths(0).getCanonicalPath |
| df.write.option("writeNonAsciiCharacterAsCodePoint", "true") |
| .option("pretty", "false").json(basePath1) |
| val actualText1 = spark.read.option("wholetext", "true").text(basePath1) |
| .sort("value").map(_.getString(0)).collect().mkString |
| val expectedText1 = |
| s"""{"value":"\\n"} |
| |{"value":"\\u3042"} |
| |{"value":"a"} |
| |""".stripMargin |
| assert(actualText1 === expectedText1) |
| |
| val actualJson1 = spark.read.json(basePath1) |
| .sort("value").map(_.getString(0)).collect().mkString |
| val expectedJson1 = "\na\u3042" |
| assert(actualJson1 === expectedJson1) |
| |
| // Test for pretty printed JSON. |
| // If multiLine option is set to true, the format should be should be |
| // one JSON record per file. So LEAF_NODE_DEFAULT_PARALLELISM is set here. |
| withSQLConf(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM.key -> s"${seq.length}") { |
| val basePath2 = paths(1).getCanonicalPath |
| df.write.option("writeNonAsciiCharacterAsCodePoint", "true") |
| .option("pretty", "true").json(basePath2) |
| val actualText2 = spark.read.option("wholetext", "true").text(basePath2) |
| .sort("value").map(_.getString(0)).collect().mkString |
| val expectedText2 = |
| s"""{ |
| | "value" : "\\n" |
| |} |
| |{ |
| | "value" : "\\u3042" |
| |} |
| |{ |
| | "value" : "a" |
| |} |
| |""".stripMargin |
| assert(actualText2 === expectedText2) |
| |
| val actualJson2 = spark.read.option("multiLine", "true").json(basePath2) |
| .sort("value").map(_.getString(0)).collect().mkString |
| val expectedJson2 = "\na\u3042" |
| assert(actualJson2 === expectedJson2) |
| } |
| } |
| // scalastyle:on nonascii |
| } |
| |
| test("SPARK-35104: Fix wrong indentation for multiple JSON even if `pretty` option is true") { |
| withSQLConf(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM.key -> "1") { |
| withTempPath { path => |
| val basePath = path.getCanonicalPath |
| val df = Seq("a", "b", "c").toDF() |
| df.write.option("pretty", "true").json(basePath) |
| |
| val expectedText = |
| s"""{ |
| | "value" : "a" |
| |} |
| |{ |
| | "value" : "b" |
| |} |
| |{ |
| | "value" : "c" |
| |} |
| |""".stripMargin |
| val actualText = spark.read.option("wholetext", "true") |
| .text(basePath).map(_.getString(0)).collect().mkString |
| assert(actualText === expectedText) |
| } |
| } |
| } |
| |
| test("SPARK-35912: turn non-nullable schema into a nullable schema") { |
| // JSON field is missing. |
| val missingFieldInput = """{"c1": 1}""" |
| // JSON filed is null. |
| val nullValueInput = """{"c1": 1, "c2": null}""" |
| |
| val schema = StructType(Seq( |
| StructField("c1", IntegerType, nullable = false), |
| StructField("c2", IntegerType, nullable = false))) |
| val expected = schema.asNullable |
| |
| Seq(missingFieldInput, nullValueInput).foreach { jsonString => |
| Seq("DROPMALFORMED", "FAILFAST", "PERMISSIVE").foreach { mode => |
| val json = spark.createDataset( |
| spark.sparkContext.parallelize(jsonString :: Nil))(Encoders.STRING) |
| val df = spark.read |
| .option("mode", mode) |
| .schema(schema) |
| .json(json) |
| assert(df.schema == expected) |
| checkAnswer(df, Row(1, null) :: Nil) |
| } |
| } |
| |
| withSQLConf(SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION.key -> "true") { |
| checkAnswer( |
| spark.read.schema( |
| StructType( |
| StructField("f1", LongType, nullable = false) :: |
| StructField("f2", LongType, nullable = false) :: Nil) |
| ).option("mode", "DROPMALFORMED").json(Seq("""{"f1": 1}""").toDS()), |
| // It is for testing legacy configuration. This is technically a bug as |
| // `0` has to be `null` but the schema is non-nullable. |
| Row(1, 0)) |
| } |
| } |
| |
| test("SPARK-36379: proceed parsing with root nulls in permissive mode") { |
| val exception = intercept[SparkException] { |
| spark.read.option("mode", "failfast") |
| .schema("a string").json(Seq("""[{"a": "str"}, null]""").toDS()).collect() |
| } |
| |
| checkError( |
| exception = exception, |
| errorClass = "MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION", |
| parameters = Map("badRecord" -> "[null]", "failFastMode" -> "FAILFAST") |
| ) |
| |
| checkError( |
| exception = ExceptionUtils.getRootCause(exception).asInstanceOf[SparkRuntimeException], |
| errorClass = "INVALID_JSON_ROOT_FIELD", |
| parameters = Map.empty |
| ) |
| |
| // Permissive modes should proceed parsing malformed records (null). |
| // Here, since an array fails to parse in the middle, we will return one row. |
| checkAnswer( |
| spark.read.option("mode", "permissive") |
| .json(Seq("""[{"a": "str"}, null, {"a": "str"}]""").toDS()), |
| Row(null) :: Nil) |
| } |
| |
| test("SPARK-44079: fix incorrect result when parse array as struct " + |
| "using PERMISSIVE mode with corrupt record") { |
| val data = """[{"a": "incorrect", "b": "correct"}, {"a": "incorrect", "b": "correct"}]""" |
| val schema = new StructType(Array(StructField("a", IntegerType), |
| StructField("b", StringType), StructField("_corrupt_record", StringType))) |
| |
| val result = spark.read |
| .option("mode", "PERMISSIVE") |
| .option("multiline", "true") |
| .schema(schema) |
| .json(Seq(data).toDS()) |
| |
| checkAnswer(result, Seq(Row(null, "correct", data), Row(null, "correct", data))) |
| } |
| |
| test("SPARK-36536: use casting when datetime pattern is not set") { |
| withSQLConf( |
| SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true", |
| SQLConf.SESSION_LOCAL_TIMEZONE.key -> DateTimeTestUtils.UTC.getId) { |
| withTempPath { path => |
| Seq( |
| """{"d":"2021","ts_ltz":"2021","ts_ntz": "2021"}""", |
| """{"d":"2021-01","ts_ltz":"2021-01 ","ts_ntz":"2021-01"}""", |
| """{"d":" 2021-2-1","ts_ltz":"2021-3-02","ts_ntz": "2021-10-1"}""", |
| """{"d":"2021-8-18 00:00:00","ts_ltz":"2021-8-18 21:44:30Z"""" + |
| ""","ts_ntz":"2021-8-18T21:44:30.123"}""" |
| ).toDF().repartition(1).write.text(path.getCanonicalPath) |
| val readback = spark.read.schema("d date, ts_ltz timestamp_ltz, ts_ntz timestamp_ntz") |
| .json(path.getCanonicalPath) |
| checkAnswer( |
| readback, |
| Seq( |
| Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"), |
| LocalDateTime.of(2021, 1, 1, 0, 0, 0)), |
| Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"), |
| LocalDateTime.of(2021, 1, 1, 0, 0, 0)), |
| Row(LocalDate.of(2021, 2, 1), Instant.parse("2021-03-02T00:00:00Z"), |
| LocalDateTime.of(2021, 10, 1, 0, 0, 0)), |
| Row(LocalDate.of(2021, 8, 18), Instant.parse("2021-08-18T21:44:30Z"), |
| LocalDateTime.of(2021, 8, 18, 21, 44, 30, 123000000)))) |
| } |
| } |
| } |
| |
| test("SPARK-36830: Support reading and writing ANSI intervals") { |
| Seq( |
| YearMonthIntervalType() -> ((i: Int) => Period.of(i, i, 0)), |
| DayTimeIntervalType() -> ((i: Int) => Duration.ofDays(i).plusSeconds(i)) |
| ).foreach { case (it, f) => |
| val data = (1 to 10).map(i => Row(i, f(i))) |
| val schema = StructType(Array(StructField("d", IntegerType, false), |
| StructField("i", it, false))) |
| withTempPath { file => |
| val df = spark.createDataFrame(sparkContext.parallelize(data), schema) |
| df.write.json(file.getCanonicalPath) |
| val df2 = spark.read.json(file.getCanonicalPath) |
| checkAnswer(df2, df.select($"d".cast(LongType), $"i".cast(StringType)).collect().toSeq) |
| val df3 = spark.read.schema(schema).json(file.getCanonicalPath) |
| checkAnswer(df3, df.collect().toSeq) |
| } |
| } |
| } |
| |
| test("SPARK-39731: Correctly parse dates and timestamps with yyyyMMdd pattern") { |
| withTempPath { path => |
| Seq( |
| """{"date": "2020011", "ts": "2020011"}""", |
| """{"date": "20201203", "ts": "20201203"}""").toDF() |
| .repartition(1) |
| .write.text(path.getAbsolutePath) |
| val schema = new StructType() |
| .add("date", DateType) |
| .add("ts", TimestampType) |
| val output = spark.read |
| .schema(schema) |
| .option("dateFormat", "yyyyMMdd") |
| .option("timestampFormat", "yyyyMMdd") |
| .json(path.getAbsolutePath) |
| |
| def check(mode: String, res: Seq[Row]): Unit = { |
| withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> mode) { |
| checkAnswer(output, res) |
| } |
| } |
| |
| check( |
| "legacy", |
| Seq( |
| Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")), |
| Row(Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00")) |
| ) |
| ) |
| |
| check( |
| "corrected", |
| Seq( |
| Row(null, null), |
| Row(Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00")) |
| ) |
| ) |
| |
| intercept[SparkUpgradeException] { |
| check("exception", Nil) |
| } |
| } |
| } |
| |
| test("SPARK-39731: Handle date and timestamp parsing fallback") { |
| withTempPath { path => |
| Seq("""{"date": "2020-01-01", "ts": "2020-01-01"}""").toDF() |
| .repartition(1) |
| .write.text(path.getAbsolutePath) |
| val schema = new StructType() |
| .add("date", DateType) |
| .add("ts", TimestampType) |
| |
| def output(enableFallback: Boolean): DataFrame = spark.read |
| .schema(schema) |
| .option("dateFormat", "invalid") |
| .option("timestampFormat", "invalid") |
| .option("enableDateTimeParsingFallback", enableFallback) |
| .json(path.getAbsolutePath) |
| |
| checkAnswer( |
| output(enableFallback = true), |
| Seq(Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00"))) |
| ) |
| |
| checkAnswer( |
| output(enableFallback = false), |
| Seq(Row(null, null)) |
| ) |
| } |
| } |
| |
| test("SPARK-40215: enable parsing fallback for JSON in CORRECTED mode with a SQL config") { |
| withTempPath { path => |
| Seq("""{"date": "2020-01-01", "ts": "2020-01-01"}""").toDF() |
| .repartition(1) |
| .write.text(path.getAbsolutePath) |
| |
| for (fallbackEnabled <- Seq(true, false)) { |
| withSQLConf( |
| SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "CORRECTED", |
| SQLConf.LEGACY_JSON_ENABLE_DATE_TIME_PARSING_FALLBACK.key -> s"$fallbackEnabled") { |
| val df = spark.read |
| .schema("date date, ts timestamp") |
| .option("dateFormat", "invalid") |
| .option("timestampFormat", "invalid") |
| .json(path.getAbsolutePath) |
| |
| if (fallbackEnabled) { |
| checkAnswer( |
| df, |
| Seq(Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00"))) |
| ) |
| } else { |
| checkAnswer( |
| df, |
| Seq(Row(null, null)) |
| ) |
| } |
| } |
| } |
| } |
| } |
| |
| test("SPARK-40496: disable parsing fallback when the date/timestamp format is provided") { |
| // The test verifies that the fallback can be disabled by providing dateFormat or |
| // timestampFormat without any additional configuration. |
| // |
| // We also need to disable "legacy" parsing mode that implicitly enables parsing fallback. |
| for (policy <- Seq("exception", "corrected")) { |
| withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> policy) { |
| withTempPath { path => |
| Seq("""{"col": "2020-01-01"}""").toDF() |
| .repartition(1) |
| .write.text(path.getAbsolutePath) |
| |
| var df = spark.read.schema("col date").option("dateFormat", "yyyy/MM/dd") |
| .json(path.getAbsolutePath) |
| checkAnswer(df, Seq(Row(null))) |
| |
| df = spark.read.schema("col timestamp").option("timestampFormat", "yyyy/MM/dd HH:mm:ss") |
| .json(path.getAbsolutePath) |
| |
| checkAnswer(df, Seq(Row(null))) |
| } |
| } |
| } |
| } |
| |
| test("SPARK-40646: parse subsequent fields if the first JSON field does not match schema") { |
| // In this example, the first record has "a.y" as boolean but it needs to be an object. |
| // We should parse "a" as null but continue parsing "b" correctly as it is valid. |
| withTempPath { path => |
| Seq( |
| """{"a": {"x": 1, "y": true}, "b": {"x": 1}}""", |
| """{"a": {"x": 2}, "b": {"x": 2}}"""").toDF() |
| .repartition(1) |
| .write.text(path.getAbsolutePath) |
| |
| for (enablePartialResults <- Seq(true, false)) { |
| withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> s"$enablePartialResults") { |
| val df = spark.read |
| .schema("a struct<x: int, y: struct<x: int>>, b struct<x: int>") |
| .json(path.getAbsolutePath) |
| |
| if (enablePartialResults) { |
| checkAnswer( |
| df, |
| Seq(Row(Row(1, null), Row(1)), Row(Row(2, null), Row(2))) |
| ) |
| } else { |
| checkAnswer( |
| df, |
| Seq(Row(null, null), Row(Row(2, null), Row(2))) |
| ) |
| } |
| } |
| } |
| } |
| } |
| |
| test("SPARK-44940: fully parse the record except f1 if partial results are enabled") { |
| withTempPath { path => |
| Seq( |
| """{"a1": "AAA", "a2": [{"f1": "", "f2": ""}], "a3": "id1", "a4": "XXX"}""", |
| """{"a1": "BBB", "a2": [{"f1": 12, "f2": ""}], "a3": "id2", "a4": "YYY"}""").toDF() |
| .repartition(1) |
| .write.text(path.getAbsolutePath) |
| |
| withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { |
| val df = spark.read.json(path.getAbsolutePath) |
| checkAnswer( |
| df, |
| Seq( |
| Row("AAA", Seq(Row(null, "")), "id1", "XXX"), |
| Row("BBB", Seq(Row(12, "")), "id2", "YYY") |
| ) |
| ) |
| } |
| |
| withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { |
| val df = spark.read.json(path.getAbsolutePath) |
| checkAnswer( |
| df, |
| Seq( |
| Row("AAA", null, null, null), |
| Row("BBB", Seq(Row(12, "")), "id2", "YYY") |
| ) |
| ) |
| } |
| } |
| } |
| |
| test("SPARK-44940: fully parse primitive map if partial results are enabled") { |
| withTempPath { path => |
| Seq( |
| """{"a1": "AAA", "a2": {"f1": "", "f2": ""}, "a3": "id1"}""", |
| """{"a1": "BBB", "a2": {"f1": 12, "f2": ""}, "a3": "id2"}""").toDF() |
| .repartition(1) |
| .write.text(path.getAbsolutePath) |
| |
| val schema = "a1 string, a2 map<string, int>, a3 string" |
| |
| withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { |
| val df = spark.read.schema(schema).json(path.getAbsolutePath) |
| // Although the keys match the string type and some values match the integer type, because |
| // some of the values do not match the type, we mark the entire map as null. |
| checkAnswer( |
| df, |
| Seq( |
| Row("AAA", null, "id1"), |
| Row("BBB", null, "id2") |
| ) |
| ) |
| } |
| |
| withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { |
| val df = spark.read.schema(schema).json(path.getAbsolutePath) |
| checkAnswer( |
| df, |
| Seq( |
| Row("AAA", null, null), |
| Row("BBB", null, null) |
| ) |
| ) |
| } |
| } |
| } |
| |
| test("SPARK-44940: fully parse map of structs if partial results are enabled") { |
| withTempPath { path => |
| Seq( |
| """{"a1": "AAA", "a2": {"key": {"f1": "", "f2": ""}}, "a3": "id1"}""", |
| """{"a1": "BBB", "a2": {"key": {"f1": 12, "f2": ""}}, "a3": "id2"}""").toDF() |
| .repartition(1) |
| .write.text(path.getAbsolutePath) |
| |
| val schema = "a1 string, a2 map<string, struct<f1: int, f2: string>>, a3 string" |
| |
| withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { |
| val df = spark.read.schema(schema).json(path.getAbsolutePath) |
| checkAnswer( |
| df, |
| Seq( |
| Row("AAA", Map("key" -> Row(null, "")), "id1"), |
| Row("BBB", Map("key" -> Row(12, "")), "id2") |
| ) |
| ) |
| } |
| |
| withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { |
| val df = spark.read.schema(schema).json(path.getAbsolutePath) |
| checkAnswer( |
| df, |
| Seq( |
| Row("AAA", null, null), |
| Row("BBB", Map("key" -> Row(12, "")), "id2") |
| ) |
| ) |
| } |
| } |
| } |
| |
| test("SPARK-44940: fully parse primitive arrays if partial results are enabled") { |
| withTempPath { path => |
| Seq( |
| """{"a1": "AAA", "a2": {"f1": [""]}, "a3": "id1", "a4": "XXX"}""", |
| """{"a1": "BBB", "a2": {"f1": [12]}, "a3": "id2", "a4": "YYY"}""").toDF() |
| .repartition(1) |
| .write.text(path.getAbsolutePath) |
| |
| withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { |
| val df = spark.read.json(path.getAbsolutePath) |
| checkAnswer( |
| df, |
| Seq( |
| Row("AAA", Row(null), "id1", "XXX"), |
| Row("BBB", Row(Seq(12)), "id2", "YYY") |
| ) |
| ) |
| } |
| |
| withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { |
| val df = spark.read.json(path.getAbsolutePath) |
| checkAnswer( |
| df, |
| Seq( |
| Row("AAA", null, null, null), |
| Row("BBB", Row(Seq(12)), "id2", "YYY") |
| ) |
| ) |
| } |
| } |
| } |
| |
| test("SPARK-44940: fully parse array of arrays if partial results are enabled") { |
| withTempPath { path => |
| Seq( |
| """{"a1": "AAA", "a2": [[12, ""], [""]], "a3": "id1", "a4": "XXX"}""", |
| """{"a1": "BBB", "a2": [[12, 34], [""]], "a3": "id2", "a4": "YYY"}""").toDF() |
| .repartition(1) |
| .write.text(path.getAbsolutePath) |
| |
| // We cannot parse `array<array<int>>` type because one of the inner arrays contains a |
| // mismatched type. |
| withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") { |
| val df = spark.read.json(path.getAbsolutePath) |
| checkAnswer( |
| df, |
| Seq( |
| Row("AAA", null, "id1", "XXX"), |
| Row("BBB", null, "id2", "YYY") |
| ) |
| ) |
| } |
| |
| withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") { |
| val df = spark.read.json(path.getAbsolutePath) |
| checkAnswer( |
| df, |
| Seq( |
| Row("AAA", null, "id1", "XXX"), |
| Row("BBB", null, "id2", "YYY") |
| ) |
| ) |
| } |
| } |
| } |
| |
| test("SPARK-40667: validate JSON Options") { |
| assert(JSONOptions.getAllOptions.size == 29) |
| // Please add validation on any new Json options here |
| assert(JSONOptions.isValidOption("samplingRatio")) |
| assert(JSONOptions.isValidOption("primitivesAsString")) |
| assert(JSONOptions.isValidOption("prefersDecimal")) |
| assert(JSONOptions.isValidOption("allowComments")) |
| assert(JSONOptions.isValidOption("allowUnquotedFieldNames")) |
| assert(JSONOptions.isValidOption("allowSingleQuotes")) |
| assert(JSONOptions.isValidOption("allowNumericLeadingZeros")) |
| assert(JSONOptions.isValidOption("allowNonNumericNumbers")) |
| assert(JSONOptions.isValidOption("allowBackslashEscapingAnyCharacter")) |
| assert(JSONOptions.isValidOption("allowUnquotedControlChars")) |
| assert(JSONOptions.isValidOption("compression")) |
| assert(JSONOptions.isValidOption("mode")) |
| assert(JSONOptions.isValidOption("dropFieldIfAllNull")) |
| assert(JSONOptions.isValidOption("ignoreNullFields")) |
| assert(JSONOptions.isValidOption("locale")) |
| assert(JSONOptions.isValidOption("dateFormat")) |
| assert(JSONOptions.isValidOption("timestampFormat")) |
| assert(JSONOptions.isValidOption("timestampNTZFormat")) |
| assert(JSONOptions.isValidOption("enableDateTimeParsingFallback")) |
| assert(JSONOptions.isValidOption("multiLine")) |
| assert(JSONOptions.isValidOption("lineSep")) |
| assert(JSONOptions.isValidOption("pretty")) |
| assert(JSONOptions.isValidOption("inferTimestamp")) |
| assert(JSONOptions.isValidOption("columnNameOfCorruptRecord")) |
| assert(JSONOptions.isValidOption("timeZone")) |
| assert(JSONOptions.isValidOption("writeNonAsciiCharacterAsCodePoint")) |
| assert(JSONOptions.isValidOption("singleVariantColumn")) |
| assert(JSONOptions.isValidOption("encoding")) |
| assert(JSONOptions.isValidOption("charset")) |
| // Please add validation on any new Json options with alternative here |
| assert(JSONOptions.getAlternativeOption("encoding").contains("charset")) |
| assert(JSONOptions.getAlternativeOption("charset").contains("encoding")) |
| assert(JSONOptions.getAlternativeOption("dateFormat").isEmpty) |
| } |
| |
| test("SPARK-25159: json schema inference should only trigger one job") { |
| withTempPath { path => |
| // This test is to prove that the `JsonInferSchema` does not use `RDD#toLocalIterator` which |
| // triggers one Spark job per RDD partition. |
| Seq(1 -> "a", 2 -> "b").toDF("i", "p") |
| // The data set has 2 partitions, so Spark will write at least 2 json files. |
| // Use a non-splittable compression (gzip), to make sure the json scan RDD has at least 2 |
| // partitions. |
| .write.partitionBy("p") |
| .option("compression", GZIP.lowerCaseName()).json(path.getCanonicalPath) |
| |
| val numJobs = new AtomicLong(0) |
| sparkContext.addSparkListener(new SparkListener { |
| override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { |
| numJobs.incrementAndGet() |
| } |
| }) |
| |
| val df = spark.read.json(path.getCanonicalPath) |
| assert(df.columns === Array("i", "p")) |
| spark.sparkContext.listenerBus.waitUntilEmpty() |
| assert(numJobs.get() == 1L) |
| } |
| } |
| |
| test("SPARK-35320: Reading JSON with key type different to String in a map should fail") { |
| Seq( |
| (MapType(IntegerType, StringType), """{"1": "test"}"""), |
| (StructType(Seq(StructField("test", MapType(IntegerType, StringType)))), |
| """"test": {"1": "test"}"""), |
| (ArrayType(MapType(IntegerType, StringType)), """[{"1": "test"}]"""), |
| (MapType(StringType, MapType(IntegerType, StringType)), """{"key": {"1" : "test"}}""") |
| ).foreach { case (schema, jsonData) => |
| withTempDir { dir => |
| val colName = "col" |
| val jsonDataSchema = StructType(Seq(StructField(colName, schema))) |
| |
| checkError( |
| exception = intercept[AnalysisException]( |
| spark.read.schema(jsonDataSchema).json(Seq(jsonData).toDS()).collect() |
| ), |
| errorClass = "INVALID_JSON_SCHEMA_MAP_TYPE", |
| parameters = Map("jsonSchema" -> toSQLType(jsonDataSchema))) |
| |
| val jsonDir = new File(dir, "json").getCanonicalPath |
| Seq(jsonData).toDF(colName).write.json(jsonDir) |
| val jsonDirSchema = StructType(Seq(StructField(colName, schema))) |
| checkError( |
| exception = intercept[AnalysisException]( |
| spark.read.schema(jsonDirSchema).json(jsonDir).collect() |
| ), |
| errorClass = "INVALID_JSON_SCHEMA_MAP_TYPE", |
| parameters = Map("jsonSchema" -> toSQLType(jsonDirSchema))) |
| } |
| } |
| } |
| |
| test("SPARK-47704: Handle partial parsing of array<map>") { |
| withTempPath { path => |
| Seq("""{"a":[{"key":{"b":0}}]}""").toDF() |
| .repartition(1) |
| .write.text(path.getAbsolutePath) |
| |
| for (enablePartialResults <- Seq(true, false)) { |
| withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> s"$enablePartialResults") { |
| val df = spark.read |
| .schema("a array<map<string, struct<b boolean>>>") |
| .json(path.getAbsolutePath) |
| |
| if (enablePartialResults) { |
| checkAnswer(df, Seq(Row(Array(Map("key" -> Row(null)))))) |
| } else { |
| checkAnswer(df, Seq(Row(null))) |
| } |
| } |
| } |
| } |
| } |
| |
| test("SPARK-47704: Handle partial parsing of map<string, array>") { |
| withTempPath { path => |
| Seq("""{"a":{"key":[{"b":0}]}}""").toDF() |
| .repartition(1) |
| .write.text(path.getAbsolutePath) |
| |
| for (enablePartialResults <- Seq(true, false)) { |
| withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> s"$enablePartialResults") { |
| val df = spark.read |
| .schema("a map<string, array<struct<b boolean>>>") |
| .json(path.getAbsolutePath) |
| |
| if (enablePartialResults) { |
| checkAnswer(df, Seq(Row(Map("key" -> Seq(Row(null)))))) |
| } else { |
| checkAnswer(df, Seq(Row(null))) |
| } |
| } |
| } |
| } |
| } |
| |
| test("SPARK-48148: values are unchanged when read as string") { |
| withTempPath { path => |
| def extractData( |
| jsonString: String, |
| expectedInexactData: Seq[String], |
| expectedExactData: Seq[String], |
| multiLine: Boolean = false): Unit = { |
| Seq(jsonString).toDF() |
| .repartition(1) |
| .write |
| .mode("overwrite") |
| .text(path.getAbsolutePath) |
| |
| withClue("Exact string parsing") { |
| withSQLConf(SQLConf.JSON_EXACT_STRING_PARSING.key -> "true") { |
| val df = spark.read |
| .schema("data STRING") |
| .option("multiLine", multiLine.toString) |
| .json(path.getAbsolutePath) |
| checkAnswer(df, expectedExactData.map(d => Row(d))) |
| } |
| } |
| |
| withClue("Inexact string parsing") { |
| withSQLConf(SQLConf.JSON_EXACT_STRING_PARSING.key -> "false") { |
| val df = spark.read |
| .schema("data STRING") |
| .option("multiLine", multiLine.toString) |
| .json(path.getAbsolutePath) |
| checkAnswer(df, expectedInexactData.map(d => Row(d))) |
| } |
| } |
| } |
| extractData( |
| """{"data": {"white": "space"}}""", |
| expectedInexactData = Seq("""{"white":"space"}"""), |
| expectedExactData = Seq("""{"white": "space"}""") |
| ) |
| extractData( |
| """{"data": ["white", "space"]}""", |
| expectedInexactData = Seq("""["white","space"]"""), |
| expectedExactData = Seq("""["white", "space"]""") |
| ) |
| val granularFloat = "-999.99999999999999999999999999999999995" |
| extractData( |
| s"""{"data": {"v": ${granularFloat}}}""", |
| expectedInexactData = Seq("""{"v":-1000.0}"""), |
| expectedExactData = Seq(s"""{"v": ${granularFloat}}""") |
| ) |
| extractData( |
| s"""{"data": {"white":\n"space"}}""", |
| expectedInexactData = Seq("""{"white":"space"}"""), |
| expectedExactData = Seq(s"""{"white":\n"space"}"""), |
| multiLine = true |
| ) |
| } |
| } |
| } |
| |
| class JsonV1Suite extends JsonSuite { |
| override protected def sparkConf: SparkConf = |
| super |
| .sparkConf |
| .set(SQLConf.USE_V1_SOURCE_LIST, "json") |
| } |
| |
| class JsonV2Suite extends JsonSuite { |
| override protected def sparkConf: SparkConf = |
| super |
| .sparkConf |
| .set(SQLConf.USE_V1_SOURCE_LIST, "") |
| |
| test("get pushed filters") { |
| val attr = "col" |
| def getBuilder(path: String): JsonScanBuilder = { |
| val fileIndex = new InMemoryFileIndex( |
| spark, |
| Seq(new org.apache.hadoop.fs.Path(path, "file.json")), |
| Map.empty, |
| None, |
| NoopCache) |
| val schema = new StructType().add(attr, IntegerType) |
| val options = CaseInsensitiveStringMap.empty() |
| new JsonScanBuilder(spark, fileIndex, schema, schema, options) |
| } |
| val filters: Array[sources.Filter] = Array(sources.IsNotNull(attr)) |
| withSQLConf(SQLConf.JSON_FILTER_PUSHDOWN_ENABLED.key -> "true") { |
| withTempPath { file => |
| val scanBuilder = getBuilder(file.getCanonicalPath) |
| assert(scanBuilder.pushDataFilters(filters) === filters) |
| } |
| } |
| |
| withSQLConf(SQLConf.JSON_FILTER_PUSHDOWN_ENABLED.key -> "false") { |
| withTempPath { file => |
| val scanBuilder = getBuilder(file.getCanonicalPath) |
| assert(scanBuilder.pushDataFilters(filters) === Array.empty[sources.Filter]) |
| } |
| } |
| } |
| } |
| |
| class JsonLegacyTimeParserSuite extends JsonSuite { |
| override protected def sparkConf: SparkConf = |
| super |
| .sparkConf |
| .set(SQLConf.LEGACY_TIME_PARSER_POLICY, "legacy") |
| } |