blob: 4a7632486c046cb2f323235289810ea8370b2493 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.SchemaRequiredDataSource
import org.apache.spark.sql.connector.catalog.InMemoryPartitionTableCatalog
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.SimpleInsertSource
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
import org.apache.spark.sql.types._
// The base trait for char/varchar tests that need to be run with different table implementations.
trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
def format: String
def checkColType(f: StructField, dt: DataType): Unit = {
assert(f.dataType == CharVarcharUtils.replaceCharVarcharWithString(dt))
assert(CharVarcharUtils.getRawType(f.metadata) == Some(dt))
}
def checkPlainResult(df: DataFrame, dt: String, insertVal: String): Unit = {
val dataType = CatalystSqlParser.parseDataType(dt)
checkColType(df.schema(1), dataType)
dataType match {
case CharType(len) =>
// char value will be padded if (<= len) or trimmed if (> len)
val fixLenStr = if (insertVal != null) {
insertVal.take(len).padTo(len, " ").mkString
} else null
checkAnswer(df, Row("1", fixLenStr))
case VarcharType(len) =>
// varchar value will be remained if (<= len) or trimmed if (> len)
val varLenStrWithUpperBound = if (insertVal != null) {
insertVal.take(len)
} else null
checkAnswer(df, Row("1", varLenStrWithUpperBound))
}
}
test("apply char padding/trimming and varchar trimming: top-level columns") {
Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c $typ) USING $format")
(0 to 5).map(n => "a" + " " * n).foreach { v =>
sql(s"INSERT OVERWRITE t VALUES ('1', '$v')")
checkPlainResult(spark.table("t"), typ, v)
}
sql("INSERT OVERWRITE t VALUES ('1', null)")
checkPlainResult(spark.table("t"), typ, null)
}
}
}
test("char type values should be padded or trimmed: partitioned columns") {
// via dynamic partitioned columns
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c CHAR(5)) USING $format PARTITIONED BY (c)")
(0 to 5).map(n => "a" + " " * n).foreach { v =>
sql(s"INSERT OVERWRITE t VALUES ('1', '$v')")
checkPlainResult(spark.table("t"), "CHAR(5)", v)
}
}
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c CHAR(5)) USING $format PARTITIONED BY (c)")
(0 to 5).map(n => "a" + " " * n).foreach { v =>
// via dynamic partitioned columns with drop partition command
sql(s"INSERT INTO t VALUES ('1', '$v')")
checkPlainResult(spark.table("t"), "CHAR(5)", v)
sql(s"ALTER TABLE t DROP PARTITION(c='a')")
checkAnswer(spark.table("t"), Nil)
// via static partitioned columns with drop partition command
sql(s"INSERT INTO t PARTITION (c ='$v') VALUES ('1')")
checkPlainResult(spark.table("t"), "CHAR(5)", v)
sql(s"ALTER TABLE t DROP PARTITION(c='a')")
checkAnswer(spark.table("t"), Nil)
}
}
}
test("char type values should not be padded when charVarcharAsString is true") {
withSQLConf(SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key -> "true") {
withTable("t") {
sql(s"CREATE TABLE t(a STRING, b CHAR(5), c CHAR(5)) USING $format partitioned by (c)")
sql("INSERT INTO t VALUES ('abc', 'abc', 'abc')")
checkAnswer(sql("SELECT b FROM t WHERE b='abc'"), Row("abc"))
checkAnswer(sql("SELECT b FROM t WHERE b in ('abc')"), Row("abc"))
checkAnswer(sql("SELECT c FROM t WHERE c='abc'"), Row("abc"))
checkAnswer(sql("SELECT c FROM t WHERE c in ('abc')"), Row("abc"))
}
}
}
test("varchar type values length check and trim: partitioned columns") {
(0 to 5).foreach { n =>
// SPARK-34192: we need to create a a new table for each round of test because of
// trailing spaces in partition column will be treated differently.
// This is because Mysql and Derby(used in tests) considers 'a' = 'a '
// whereas others like (Postgres, Oracle) doesn't exhibit this problem.
// see more at:
// https://issues.apache.org/jira/browse/HIVE-13618
// https://issues.apache.org/jira/browse/SPARK-34192
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c VARCHAR(5)) USING $format PARTITIONED BY (c)")
val v = "a" + " " * n
// via dynamic partitioned columns
sql(s"INSERT INTO t VALUES ('1', '$v')")
checkPlainResult(spark.table("t"), "VARCHAR(5)", v)
sql(s"ALTER TABLE t DROP PARTITION(c='$v')")
checkAnswer(spark.table("t"), Nil)
// via static partitioned columns
sql(s"INSERT INTO t PARTITION (c='$v') VALUES ('1')")
checkPlainResult(spark.table("t"), "VARCHAR(5)", v)
sql(s"ALTER TABLE t DROP PARTITION(c='$v')")
checkAnswer(spark.table("t"), Nil)
}
}
}
test("oversize char/varchar values for alter table partition operations") {
Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c $typ) USING $format PARTITIONED BY (c)")
Seq("ADD", "DROP").foreach { op =>
checkError(
exception = intercept[SparkRuntimeException] {
sql(s"ALTER TABLE t $op PARTITION(c='abcdef')")
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
}
checkError(
exception = intercept[SparkRuntimeException] {
sql(s"ALTER TABLE t PARTITION (c='abcdef') RENAME TO PARTITION (c='2')")
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
checkError(
exception = intercept[SparkRuntimeException] {
sql(s"ALTER TABLE t PARTITION (c='1') RENAME TO PARTITION (c='abcdef')")
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
}
}
}
test("SPARK-34233: char/varchar with null value for partitioned columns") {
Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c $typ) USING $format PARTITIONED BY (c)")
sql("INSERT INTO t VALUES ('1', null)")
checkPlainResult(spark.table("t"), typ, null)
sql("INSERT OVERWRITE t VALUES ('1', null)")
checkPlainResult(spark.table("t"), typ, null)
sql("INSERT OVERWRITE t PARTITION (c=null) VALUES ('1')")
checkPlainResult(spark.table("t"), typ, null)
sql("ALTER TABLE t DROP PARTITION(c=null)")
checkAnswer(spark.table("t"), Nil)
}
}
}
test("char type values should be padded: nested in struct") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c STRUCT<c: CHAR(5)>) USING $format")
sql("INSERT INTO t VALUES ('1', struct('a'))")
checkAnswer(spark.table("t"), Row("1", Row("a" + " " * 4)))
checkColType(spark.table("t").schema(1), new StructType().add("c", CharType(5)))
sql("INSERT OVERWRITE t VALUES ('1', null)")
checkAnswer(spark.table("t"), Row("1", null))
sql("INSERT OVERWRITE t VALUES ('1', struct(null))")
checkAnswer(spark.table("t"), Row("1", Row(null)))
}
}
test("char type values should be padded: nested in array") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c ARRAY<CHAR(5)>) USING $format")
sql("INSERT INTO t VALUES ('1', array('a', 'ab'))")
checkAnswer(spark.table("t"), Row("1", Seq("a" + " " * 4, "ab" + " " * 3)))
checkColType(spark.table("t").schema(1), ArrayType(CharType(5)))
sql("INSERT OVERWRITE t VALUES ('1', null)")
checkAnswer(spark.table("t"), Row("1", null))
sql("INSERT OVERWRITE t VALUES ('1', array(null))")
checkAnswer(spark.table("t"), Row("1", Seq(null)))
}
}
test("char type values should be padded: nested in map key") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c MAP<CHAR(5), STRING>) USING $format")
sql("INSERT INTO t VALUES ('1', map('a', 'ab'))")
checkAnswer(spark.table("t"), Row("1", Map(("a" + " " * 4, "ab"))))
checkColType(spark.table("t").schema(1), MapType(CharType(5), StringType))
sql("INSERT OVERWRITE t VALUES ('1', null)")
checkAnswer(spark.table("t"), Row("1", null))
}
}
test("char type values should be padded: nested in map value") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c MAP<STRING, CHAR(5)>) USING $format")
sql("INSERT INTO t VALUES ('1', map('a', 'ab'))")
checkAnswer(spark.table("t"), Row("1", Map(("a", "ab" + " " * 3))))
checkColType(spark.table("t").schema(1), MapType(StringType, CharType(5)))
sql("INSERT OVERWRITE t VALUES ('1', null)")
checkAnswer(spark.table("t"), Row("1", null))
sql("INSERT OVERWRITE t VALUES ('1', map('a', null))")
checkAnswer(spark.table("t"), Row("1", Map("a" -> null)))
}
}
test("char type values should be padded: nested in both map key and value") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c MAP<CHAR(5), CHAR(10)>) USING $format")
sql("INSERT INTO t VALUES ('1', map('a', 'ab'))")
checkAnswer(spark.table("t"), Row("1", Map(("a" + " " * 4, "ab" + " " * 8))))
checkColType(spark.table("t").schema(1), MapType(CharType(5), CharType(10)))
sql("INSERT OVERWRITE t VALUES ('1', null)")
checkAnswer(spark.table("t"), Row("1", null))
}
}
test("char type values should be padded: nested in struct of array") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c STRUCT<c: ARRAY<CHAR(5)>>) USING $format")
sql("INSERT INTO t VALUES ('1', struct(array('a', 'ab')))")
checkAnswer(spark.table("t"), Row("1", Row(Seq("a" + " " * 4, "ab" + " " * 3))))
checkColType(spark.table("t").schema(1),
new StructType().add("c", ArrayType(CharType(5))))
sql("INSERT OVERWRITE t VALUES ('1', null)")
checkAnswer(spark.table("t"), Row("1", null))
sql("INSERT OVERWRITE t VALUES ('1', struct(null))")
checkAnswer(spark.table("t"), Row("1", Row(null)))
sql("INSERT OVERWRITE t VALUES ('1', struct(array(null)))")
checkAnswer(spark.table("t"), Row("1", Row(Seq(null))))
}
}
test("char type values should be padded: nested in array of struct") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c ARRAY<STRUCT<c: CHAR(5)>>) USING $format")
sql("INSERT INTO t VALUES ('1', array(struct('a'), struct('ab')))")
checkAnswer(spark.table("t"), Row("1", Seq(Row("a" + " " * 4), Row("ab" + " " * 3))))
checkColType(spark.table("t").schema(1),
ArrayType(new StructType().add("c", CharType(5))))
sql("INSERT OVERWRITE t VALUES ('1', null)")
checkAnswer(spark.table("t"), Row("1", null))
sql("INSERT OVERWRITE t VALUES ('1', array(null))")
checkAnswer(spark.table("t"), Row("1", Seq(null)))
sql("INSERT OVERWRITE t VALUES ('1', array(struct(null)))")
checkAnswer(spark.table("t"), Row("1", Seq(Row(null))))
}
}
test("char type values should be padded: nested in array of array") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c ARRAY<ARRAY<CHAR(5)>>) USING $format")
sql("INSERT INTO t VALUES ('1', array(array('a', 'ab')))")
checkAnswer(spark.table("t"), Row("1", Seq(Seq("a" + " " * 4, "ab" + " " * 3))))
checkColType(spark.table("t").schema(1), ArrayType(ArrayType(CharType(5))))
sql("INSERT OVERWRITE t VALUES ('1', null)")
checkAnswer(spark.table("t"), Row("1", null))
sql("INSERT OVERWRITE t VALUES ('1', array(null))")
checkAnswer(spark.table("t"), Row("1", Seq(null)))
sql("INSERT OVERWRITE t VALUES ('1', array(array(null)))")
checkAnswer(spark.table("t"), Row("1", Seq(Seq(null))))
}
}
private def testTableWrite(f: String => Unit): Unit = {
withTable("t") { f("char") }
withTable("t") { f("varchar") }
}
test("length check for input string values: top-level columns") {
testTableWrite { typeName =>
sql(s"CREATE TABLE t(c $typeName(5)) USING $format")
sql("INSERT INTO t VALUES (null)")
checkAnswer(spark.table("t"), Row(null))
val e = intercept[SparkException] {
sql("INSERT INTO t VALUES ('123456')")
}
checkError(
exception = e.getCause match {
case c: SparkRuntimeException => c
case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
}
}
test("length check for input string values: partitioned columns") {
// DS V2 doesn't support partitioned table.
if (!conf.contains(SQLConf.DEFAULT_CATALOG.key)) {
val tableName = "t"
testTableWrite { typeName =>
sql(s"CREATE TABLE $tableName(i INT, c $typeName(5)) USING $format PARTITIONED BY (c)")
sql(s"INSERT INTO $tableName VALUES (1, null)")
checkAnswer(spark.table(tableName), Row(1, null))
val e = intercept[SparkException](sql(s"INSERT INTO $tableName VALUES (1, '123456')"))
checkError(
exception = e.getCause.asInstanceOf[SparkException],
errorClass = "TASK_WRITE_FAILED",
parameters = Map("path" -> s".*$tableName.*"),
matchPVals = true
)
}
}
}
test("length check for input string values: nested in struct") {
testTableWrite { typeName =>
sql(s"CREATE TABLE t(c STRUCT<c: $typeName(5)>) USING $format")
sql("INSERT INTO t SELECT struct(null)")
checkAnswer(spark.table("t"), Row(Row(null)))
checkError(
exception = intercept[SparkRuntimeException] {
sql("INSERT INTO t SELECT struct('123456')")
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
}
}
test("length check for input string values: nested in array") {
testTableWrite { typeName =>
sql(s"CREATE TABLE t(c ARRAY<$typeName(5)>) USING $format")
sql("INSERT INTO t VALUES (array(null))")
checkAnswer(spark.table("t"), Row(Seq(null)))
val e = intercept[SparkException] {
sql("INSERT INTO t VALUES (array('a', '123456'))")
}
checkError(
exception = e.getCause match {
case c: SparkRuntimeException => c
case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
}
}
test("length check for input string values: nested in map key") {
testTableWrite { typeName =>
sql(s"CREATE TABLE t(c MAP<$typeName(5), STRING>) USING $format")
val e = intercept[SparkException](sql("INSERT INTO t VALUES (map('123456', 'a'))"))
checkError(
exception = e.getCause match {
case c: SparkRuntimeException => c
case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
}
}
test("length check for input string values: nested in map value") {
testTableWrite { typeName =>
sql(s"CREATE TABLE t(c MAP<STRING, $typeName(5)>) USING $format")
sql("INSERT INTO t VALUES (map('a', null))")
checkAnswer(spark.table("t"), Row(Map("a" -> null)))
val e = intercept[SparkException](sql("INSERT INTO t VALUES (map('a', '123456'))"))
checkError(
exception = e.getCause match {
case c: SparkRuntimeException => c
case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
}
}
test("length check for input string values: nested in both map key and value") {
testTableWrite { typeName =>
sql(s"CREATE TABLE t(c MAP<$typeName(5), $typeName(5)>) USING $format")
val e1 = intercept[SparkException](sql("INSERT INTO t VALUES (map('123456', 'a'))"))
checkError(
exception = e1.getCause match {
case c: SparkRuntimeException => c
case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
val e2 = intercept[SparkException](sql("INSERT INTO t VALUES (map('a', '123456'))"))
checkError(
exception = e2.getCause match {
case c: SparkRuntimeException => c
case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
}
}
test("length check for input string values: nested in struct of array") {
testTableWrite { typeName =>
sql(s"CREATE TABLE t(c STRUCT<c: ARRAY<$typeName(5)>>) USING $format")
sql("INSERT INTO t SELECT struct(array(null))")
checkAnswer(spark.table("t"), Row(Row(Seq(null))))
val e = intercept[SparkException](sql("INSERT INTO t SELECT struct(array('123456'))"))
checkError(
exception = e.getCause match {
case c: SparkRuntimeException => c
case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
}
}
test("length check for input string values: nested in array of struct") {
testTableWrite { typeName =>
sql(s"CREATE TABLE t(c ARRAY<STRUCT<c: $typeName(5)>>) USING $format")
sql("INSERT INTO t VALUES (array(struct(null)))")
checkAnswer(spark.table("t"), Row(Seq(Row(null))))
val e = intercept[SparkException](sql("INSERT INTO t VALUES (array(struct('123456')))"))
checkError(
exception = e.getCause match {
case c: SparkRuntimeException => c
case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
}
}
test("length check for input string values: nested in array of array") {
testTableWrite { typeName =>
sql(s"CREATE TABLE t(c ARRAY<ARRAY<$typeName(5)>>) USING $format")
sql("INSERT INTO t VALUES (array(array(null)))")
checkAnswer(spark.table("t"), Row(Seq(Seq(null))))
val e = intercept[SparkException](sql("INSERT INTO t VALUES (array(array('123456')))"))
checkError(
exception = e.getCause match {
case c: SparkRuntimeException => c
case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
}
}
test("length check for input string values: with trailing spaces") {
withTable("t") {
sql(s"CREATE TABLE t(c1 CHAR(5), c2 VARCHAR(5)) USING $format")
sql("INSERT INTO t VALUES ('12 ', '12 ')")
sql("INSERT INTO t VALUES ('1234 ', '1234 ')")
checkAnswer(spark.table("t"), Seq(
Row("12" + " " * 3, "12 "),
Row("1234 ", "1234 ")))
}
}
test("length check for input string values: with implicit cast") {
withTable("t") {
sql(s"CREATE TABLE t(c1 CHAR(5), c2 VARCHAR(5)) USING $format")
sql("INSERT INTO t VALUES (1234, 1234)")
checkAnswer(spark.table("t"), Row("1234 ", "1234"))
val e1 = intercept[SparkException](sql("INSERT INTO t VALUES (123456, 1)"))
checkError(
exception = e1.getCause match {
case c: SparkRuntimeException => c
case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
val e2 = intercept[SparkException](sql("INSERT INTO t VALUES (1, 123456)"))
checkError(
exception = e2.getCause match {
case c: SparkRuntimeException => c
case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
}
}
private def testConditions(df: DataFrame, conditions: Seq[(String, Boolean)]): Unit = {
checkAnswer(df.selectExpr(conditions.map(_._1): _*), Row.fromSeq(conditions.map(_._2)))
}
test("char type comparison: top-level columns") {
withTable("t") {
sql(s"CREATE TABLE t(c1 CHAR(2), c2 CHAR(5)) USING $format")
sql("INSERT INTO t VALUES ('a', 'a')")
testConditions(spark.table("t"), Seq(
("c1 = 'a'", true),
("'a' = c1", true),
("c1 = 'a '", true),
("c1 > 'a'", false),
("c1 IN ('a', 'b')", true),
("c1 = c2", true),
("c1 < c2", false),
("c1 IN (c2)", true),
("c1 <=> null", false)))
}
}
test("char type comparison: partitioned columns") {
withTable("t") {
sql(s"CREATE TABLE t(i INT, c1 CHAR(2), c2 CHAR(5)) USING $format PARTITIONED BY (c1, c2)")
sql("INSERT INTO t VALUES (1, 'a', 'a')")
testConditions(spark.table("t"), Seq(
("c1 = 'a'", true),
("'a' = c1", true),
("c1 = 'a '", true),
("c1 > 'a'", false),
("c1 IN ('a', 'b')", true),
("c1 = c2", true),
("c1 < c2", false),
("c1 IN (c2)", true),
("c1 <=> null", false)))
}
}
private def testNullConditions(df: DataFrame, conditions: Seq[String]): Unit = {
conditions.foreach { cond =>
checkAnswer(df.selectExpr(cond), Row(null))
}
}
test("SPARK-34233: char type comparison with null values") {
val conditions = Seq("c = null", "c IN ('e', null)", "c IN (null)")
withTable("t") {
sql(s"CREATE TABLE t(c CHAR(2)) USING $format")
sql("INSERT INTO t VALUES ('a')")
testNullConditions(spark.table("t"), conditions)
}
withTable("t") {
sql(s"CREATE TABLE t(i INT, c CHAR(2)) USING $format PARTITIONED BY (c)")
sql("INSERT INTO t VALUES (1, 'a')")
testNullConditions(spark.table("t"), conditions)
}
}
test("char type comparison: partition pruning") {
withTable("t") {
sql(s"CREATE TABLE t(i INT, c1 CHAR(2), c2 VARCHAR(5)) USING $format PARTITIONED BY (c1, c2)")
sql("INSERT INTO t VALUES (1, 'a', 'a')")
Seq(("c1 = 'a'", true),
("'a' = c1", true),
("c1 = 'a '", true),
("c1 > 'a'", false),
("c1 IN ('a', 'b')", true),
("c2 = 'a '", false),
("c2 = 'a'", true),
("c2 IN ('a', 'b')", true)).foreach { case (con, res) =>
val df = spark.table("t")
withClue(con) {
checkAnswer(df.where(con), df.where(res.toString))
}
}
}
}
test("char type comparison: join") {
withTable("t1", "t2") {
sql(s"CREATE TABLE t1(c CHAR(2)) USING $format")
sql(s"CREATE TABLE t2(c CHAR(5)) USING $format")
sql("INSERT INTO t1 VALUES ('a')")
sql("INSERT INTO t2 VALUES ('a')")
checkAnswer(sql("SELECT t1.c FROM t1 JOIN t2 ON t1.c = t2.c"), Row("a "))
}
}
test("char type comparison: nested in struct") {
withTable("t") {
sql(s"CREATE TABLE t(c1 STRUCT<c: CHAR(2)>, c2 STRUCT<c: CHAR(5)>) USING $format")
sql("INSERT INTO t VALUES (struct('a'), struct('a'))")
testConditions(spark.table("t"), Seq(
("c1 = c2", true),
("c1 < c2", false),
("c1 IN (c2)", true)))
}
}
test("char type comparison: nested in array") {
withTable("t") {
sql(s"CREATE TABLE t(c1 ARRAY<CHAR(2)>, c2 ARRAY<CHAR(5)>) USING $format")
sql("INSERT INTO t VALUES (array('a', 'b'), array('a', 'b'))")
testConditions(spark.table("t"), Seq(
("c1 = c2", true),
("c1 < c2", false),
("c1 IN (c2)", true)))
}
}
test("char type comparison: nested in struct of array") {
withTable("t") {
sql("CREATE TABLE t(c1 STRUCT<a: ARRAY<CHAR(2)>>, c2 STRUCT<a: ARRAY<CHAR(5)>>) " +
s"USING $format")
sql("INSERT INTO t VALUES (struct(array('a', 'b')), struct(array('a', 'b')))")
testConditions(spark.table("t"), Seq(
("c1 = c2", true),
("c1 < c2", false),
("c1 IN (c2)", true)))
}
}
test("char type comparison: nested in array of struct") {
withTable("t") {
sql("CREATE TABLE t(c1 ARRAY<STRUCT<c: CHAR(2)>>, c2 ARRAY<STRUCT<c: CHAR(5)>>) " +
s"USING $format")
sql("INSERT INTO t VALUES (array(struct('a')), array(struct('a')))")
testConditions(spark.table("t"), Seq(
("c1 = c2", true),
("c1 < c2", false),
("c1 IN (c2)", true)))
}
}
test("char type comparison: nested in array of array") {
withTable("t") {
sql("CREATE TABLE t(c1 ARRAY<ARRAY<CHAR(2)>>, c2 ARRAY<ARRAY<CHAR(5)>>) " +
s"USING $format")
sql("INSERT INTO t VALUES (array(array('a')), array(array('a')))")
testConditions(spark.table("t"), Seq(
("c1 = c2", true),
("c1 < c2", false),
("c1 IN (c2)", true)))
}
}
test("SPARK-33892: DESCRIBE TABLE w/ char/varchar") {
withTable("t") {
sql(s"CREATE TABLE t(v VARCHAR(3), c CHAR(5)) USING $format")
checkAnswer(sql("desc t").selectExpr("data_type").where("data_type like '%char%'"),
Seq(Row("char(5)"), Row("varchar(3)")))
}
}
test("SPARK-34003: fix char/varchar fails w/ both group by and order by ") {
withTable("t") {
sql(s"CREATE TABLE t(v VARCHAR(3), i INT) USING $format")
sql("INSERT INTO t VALUES ('c', 1)")
checkAnswer(sql("SELECT v, sum(i) FROM t GROUP BY v ORDER BY v"), Row("c", 1))
}
}
test("SPARK-34003: fix char/varchar fails w/ order by functions") {
withTable("t") {
sql(s"CREATE TABLE t(v VARCHAR(3), i INT) USING $format")
sql("INSERT INTO t VALUES ('c', 1)")
checkAnswer(sql("SELECT substr(v, 1, 2), sum(i) FROM t GROUP BY v ORDER BY substr(v, 1, 2)"),
Row("c", 1))
checkAnswer(sql("SELECT sum(i) FROM t GROUP BY v ORDER BY substr(v, 1, 2)"),
Row(1))
}
}
test("SPARK-34114: varchar type will strip tailing spaces to certain length at write time") {
withTable("t") {
sql(s"CREATE TABLE t(v VARCHAR(3)) USING $format")
sql("INSERT INTO t VALUES ('c ')")
checkAnswer(spark.table("t"), Row("c "))
}
}
test("SPARK-34114: varchar type will remain the value length with spaces at read time") {
withTable("t") {
sql(s"CREATE TABLE t(v VARCHAR(3)) USING $format")
sql("INSERT INTO t VALUES ('c ')")
checkAnswer(spark.table("t"), Row("c "))
}
}
test("SPARK-34833: right-padding applied correctly for correlated subqueries - join keys") {
withTable("t1", "t2") {
sql(s"CREATE TABLE t1(v VARCHAR(3), c CHAR(5)) USING $format")
sql(s"CREATE TABLE t2(v VARCHAR(5), c CHAR(8)) USING $format")
sql("INSERT INTO t1 VALUES ('c', 'b')")
sql("INSERT INTO t2 VALUES ('a', 'b')")
Seq("t1.c = t2.c", "t2.c = t1.c",
"t1.c = 'b'", "'b' = t1.c", "t1.c = 'b '", "'b ' = t1.c",
"t1.c = 'b '", "'b ' = t1.c").foreach { predicate =>
checkAnswer(sql(
s"""
|SELECT v FROM t1
|WHERE 'a' IN (SELECT v FROM t2 WHERE $predicate)
""".stripMargin),
Row("c"))
}
}
}
test("SPARK-34833: right-padding applied correctly for correlated subqueries - other preds") {
withTable("t") {
sql(s"CREATE TABLE t(c0 INT, c1 CHAR(5), c2 CHAR(7)) USING $format")
sql("INSERT INTO t VALUES (1, 'abc', 'abc')")
Seq("c1 = 'abc'", "'abc' = c1", "c1 = 'abc '", "'abc ' = c1",
"c1 = 'abc '", "'abc ' = c1", "c1 = c2", "c2 = c1",
"c1 IN ('xxx', 'abc', 'xxxxx')", "c1 IN ('xxx', 'abc ', 'xxxxx')",
"c1 IN ('xxx', 'abc ', 'xxxxx')",
"c1 IN (c2)", "c2 IN (c1)").foreach { predicate =>
checkAnswer(sql(
s"""
|SELECT c0 FROM t t1
|WHERE (
| SELECT count(*) AS c
| FROM t
| WHERE c0 = t1.c0 AND $predicate
|) > 0
""".stripMargin),
Row(1))
}
}
}
test("SPARK-35359: create table and insert data over length values") {
Seq("char", "varchar").foreach { typ =>
withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
withTable("t") {
sql(s"CREATE TABLE t (col $typ(2)) using $format")
sql("INSERT INTO t SELECT 'aaa'")
checkAnswer(sql("select * from t"), Row("aaa"))
}
}
}
}
}
// Some basic char/varchar tests which doesn't rely on table implementation.
class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession {
import testImplicits._
test("user-specified schema in cast") {
def assertNoCharType(df: DataFrame): Unit = {
checkAnswer(df, Row("0"))
assert(df.schema.map(_.dataType) == Seq(StringType))
}
val logAppender = new LogAppender("The Spark cast operator does not support char/varchar" +
" type and simply treats them as string type. Please use string type directly to avoid" +
" confusion.")
withLogAppender(logAppender) {
assertNoCharType(spark.range(1).select($"id".cast("char(5)")))
assertNoCharType(spark.range(1).select($"id".cast(CharType(5))))
assertNoCharType(spark.range(1).selectExpr("CAST(id AS CHAR(5))"))
assertNoCharType(sql("SELECT CAST(id AS CHAR(5)) FROM range(1)"))
}
}
test("invalidate char/varchar in functions") {
checkError(
exception = intercept[AnalysisException] {
sql("""SELECT from_json('{"a": "str"}', 'a CHAR(5)')""")
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING",
parameters = Map.empty,
context = ExpectedContext(
fragment = "from_json('{\"a\": \"str\"}', 'a CHAR(5)')",
start = 7,
stop = 44)
)
withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
val df = sql("""SELECT from_json('{"a": "str"}', 'a CHAR(5)')""")
checkAnswer(df, Row(Row("str")))
val schema = df.schema.head.dataType.asInstanceOf[StructType]
assert(schema.map(_.dataType) == Seq(StringType))
}
}
test("invalidate char/varchar in SparkSession createDataframe") {
val df = spark.range(10).map(_.toString).toDF()
val schema = new StructType().add("id", CharType(5))
checkError(
exception = intercept[AnalysisException] {
spark.createDataFrame(df.collectAsList(), schema)
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
)
checkError(
exception = intercept[AnalysisException] {
spark.createDataFrame(df.rdd, schema)
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
)
checkError(
exception = intercept[AnalysisException] {
spark.createDataFrame(df.toJavaRDD, schema)
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
)
withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
val df1 = spark.createDataFrame(df.collectAsList(), schema)
checkAnswer(df1, df)
assert(df1.schema.head.dataType === StringType)
}
}
test("invalidate char/varchar in spark.read.schema") {
checkError(
exception = intercept[AnalysisException] {
spark.read.schema(new StructType().add("id", CharType(5)))
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING")
checkError(
exception = intercept[AnalysisException] {
spark.read.schema("id char(5)")
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
)
withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
val ds = spark.range(10).map(_.toString)
val df1 = spark.read.schema(new StructType().add("id", CharType(5))).csv(ds)
assert(df1.schema.map(_.dataType) == Seq(StringType))
val df2 = spark.read.schema("id char(5)").csv(ds)
assert(df2.schema.map(_.dataType) == Seq(StringType))
def checkSchema(df: DataFrame): Unit = {
val schemas = df.queryExecution.analyzed.collect {
case l: LogicalRelation => l.relation.schema
case d: DataSourceV2Relation => d.table.schema()
}
assert(schemas.length == 1)
assert(schemas.head.map(_.dataType) == Seq(StringType))
}
// user-specified schema in DataFrameReader: DSV1
checkSchema(spark.read.schema(new StructType().add("id", CharType(5)))
.format(classOf[SimpleInsertSource].getName).load())
checkSchema(spark.read.schema("id char(5)")
.format(classOf[SimpleInsertSource].getName).load())
// user-specified schema in DataFrameReader: DSV2
checkSchema(spark.read.schema(new StructType().add("id", CharType(5)))
.format(classOf[SchemaRequiredDataSource].getName).load())
checkSchema(spark.read.schema("id char(5)")
.format(classOf[SchemaRequiredDataSource].getName).load())
}
}
test("invalidate char/varchar in udf's result type") {
checkError(
exception = intercept[AnalysisException] {
spark.udf.register("testchar", () => "B", VarcharType(1))
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
)
checkError(
exception = intercept[AnalysisException] {
spark.udf.register("testchar2", (x: String) => x, VarcharType(1))
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
)
withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
spark.udf.register("testchar", () => "B", VarcharType(1))
spark.udf.register("testchar2", (x: String) => x, VarcharType(1))
val df1 = spark.sql("select testchar()")
checkAnswer(df1, Row("B"))
assert(df1.schema.head.dataType === StringType)
val df2 = spark.sql("select testchar2('abc')")
checkAnswer(df2, Row("abc"))
assert(df2.schema.head.dataType === StringType)
}
}
test("invalidate char/varchar in spark.readStream.schema") {
checkError(
exception = intercept[AnalysisException] {
spark.readStream.schema(new StructType().add("id", CharType(5)))
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
)
checkError(
exception = intercept[AnalysisException] {
spark.readStream.schema("id char(5)")
},
errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
)
withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
withTempPath { dir =>
spark.range(2).write.save(dir.toString)
val df1 = spark.readStream.schema(new StructType().add("id", CharType(5)))
.load(dir.toString)
assert(df1.schema.map(_.dataType) == Seq(StringType))
val df2 = spark.readStream.schema("id char(5)").load(dir.toString)
assert(df2.schema.map(_.dataType) == Seq(StringType))
}
}
}
test("SPARK-44409: Handle char/varchar in Dataset.to to keep consistent with others") {
val newSchema = StructType.fromDDL("v varchar(255), c char(10)")
withTable("t") {
sql("CREATE TABLE t(c char(10), v varchar(255)) USING parquet")
sql("INSERT INTO t VALUES('spark', 'awesome')")
val df = sql("SELECT * FROM t")
checkError(exception = intercept[AnalysisException] {
df.to(newSchema)
}, errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING", parameters = Map.empty)
withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
val df1 = df.to(newSchema)
checkAnswer(df1, df.select("v", "c"))
assert(df1.schema.last.dataType === StringType)
}
}
}
}
class FileSourceCharVarcharTestSuite extends CharVarcharTestSuite with SharedSparkSession {
override def format: String = "parquet"
override protected def sparkConf: SparkConf = {
super.sparkConf.set(SQLConf.USE_V1_SOURCE_LIST, "parquet")
}
test("create table w/ location and fit length values") {
withTempPath { dir =>
withTable("t") {
sql("SELECT '12' as col1, '12' as col2").write.format(format).save(dir.toString)
sql(s"CREATE TABLE t (col1 char(3), col2 varchar(3)) using $format LOCATION '$dir'")
checkAnswer(sql("select * from t"), Row("12 ", "12"))
}
}
}
test("create table w/ location and over length values") {
Seq("char", "varchar").foreach { typ =>
withTempPath { dir =>
withTable("t") {
sql("SELECT '123456' as col").write.format(format).save(dir.toString)
sql(s"CREATE TABLE t (col $typ(2)) using $format LOCATION '$dir'")
checkAnswer(sql("select * from t"), Row("123456"))
}
}
}
}
test("alter table set location w/ fit length values") {
withTempPath { dir =>
withTable("t") {
sql("SELECT '12' as col1, '12' as col2").write.format(format).save(dir.toString)
sql(s"CREATE TABLE t (col1 char(3), col2 varchar(3)) using $format")
sql(s"ALTER TABLE t SET LOCATION '$dir'")
checkAnswer(spark.table("t"), Row("12 ", "12"))
}
}
}
test("alter table set location w/ over length values") {
Seq("char", "varchar").foreach { typ =>
withTempPath { dir =>
withTable("t") {
sql("SELECT '123456' as col").write.format(format).save(dir.toString)
sql(s"CREATE TABLE t (col $typ(2)) using $format")
sql(s"ALTER TABLE t SET LOCATION '$dir'")
checkAnswer(spark.table("t"), Row("123456"))
}
}
}
}
test("SPARK-34114: should not trim right for read-side length check and char padding") {
Seq("char", "varchar").foreach { typ =>
withTempPath { dir =>
withTable("t") {
sql("SELECT '12 ' as col").write.format(format).save(dir.toString)
sql(s"CREATE TABLE t (col $typ(2)) using $format LOCATION '$dir'")
checkAnswer(spark.table("t"), Row("12 "))
}
}
}
}
test("SPARK-40697: read-side char padding should only be applied if necessary") {
withTable("t") {
sql(
s"""
|CREATE TABLE t (
| c1 CHAR(5),
| c2 STRUCT<i VARCHAR(5)>,
| c3 ARRAY<VARCHAR(5)>,
| c4 MAP<INT, VARCHAR(5)>
|) USING $format
|""".stripMargin)
spark.read.table("t").queryExecution.analyzed.foreach {
case Project(projectList, _) =>
assert(projectList.length == 4)
assert(projectList.drop(1).forall(_.isInstanceOf[Attribute]))
case _ =>
}
}
}
test("char/varchar type values length check: partitioned columns of other types") {
val tableName = "t"
Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
withTable(tableName) {
sql(s"CREATE TABLE $tableName(i STRING, c $typ) USING $format PARTITIONED BY (c)")
Seq(1, 10, 100, 1000, 10000).foreach { v =>
sql(s"INSERT OVERWRITE $tableName VALUES ('1', $v)")
checkPlainResult(spark.table(tableName), typ, v.toString)
sql(s"ALTER TABLE $tableName DROP PARTITION(c=$v)")
checkAnswer(spark.table(tableName), Nil)
}
val e1 = intercept[SparkException](sql(s"INSERT OVERWRITE $tableName VALUES ('1', 100000)"))
checkError(
exception = e1.getCause.asInstanceOf[SparkException],
errorClass = "TASK_WRITE_FAILED",
parameters = Map("path" -> s".*$tableName"),
matchPVals = true
)
checkError(
exception = intercept[SparkRuntimeException] {
sql("ALTER TABLE t DROP PARTITION(c=100000)")
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
}
}
}
}
class DSV2CharVarcharTestSuite extends CharVarcharTestSuite
with SharedSparkSession {
override def format: String = "foo"
protected override def sparkConf = {
super.sparkConf
.set("spark.sql.catalog.testcat", classOf[InMemoryPartitionTableCatalog].getName)
.set(SQLConf.DEFAULT_CATALOG.key, "testcat")
}
test("char/varchar type values length check: partitioned columns of other types") {
Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c $typ) USING $format PARTITIONED BY (c)")
Seq(1, 10, 100, 1000, 10000).foreach { v =>
sql(s"INSERT OVERWRITE t VALUES ('1', $v)")
checkPlainResult(spark.table("t"), typ, v.toString)
sql(s"ALTER TABLE t DROP PARTITION(c=$v)")
checkAnswer(spark.table("t"), Nil)
}
val e1 = intercept[SparkException](sql(s"INSERT OVERWRITE t VALUES ('1', 100000)"))
checkError(
exception = e1.getCause match {
case c: SparkRuntimeException => c
case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
checkError(
exception = intercept[SparkRuntimeException] {
sql("ALTER TABLE t DROP PARTITION(c=100000)")
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
}
}
}
test("SPARK-42611: check char/varchar length in reordered nested structs") {
Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
withTable("t") {
sql(s"CREATE TABLE t(s STRUCT<n_c: $typ, n_i: INT>) USING $format")
val inputDF = sql("SELECT named_struct('n_i', 1, 'n_c', '123456') AS s")
checkError(
exception = intercept[SparkRuntimeException] {
inputDF.writeTo("t").append()
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
}
}
}
test("SPARK-42611: check char/varchar length in reordered structs within arrays") {
Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
withTable("t") {
sql(s"CREATE TABLE t(a ARRAY<STRUCT<n_c: $typ, n_i: INT>>) USING $format")
val inputDF = sql("SELECT array(named_struct('n_i', 1, 'n_c', '123456')) AS a")
val e = intercept[SparkException](inputDF.writeTo("t").append())
checkError(
exception = e.getCause match {
case c: SparkRuntimeException => c
case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
}
}
}
test("SPARK-42611: check char/varchar length in reordered structs within map keys") {
Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
withTable("t") {
sql(s"CREATE TABLE t(m MAP<STRUCT<n_c: $typ, n_i: INT>, INT>) USING $format")
val inputDF = sql("SELECT map(named_struct('n_i', 1, 'n_c', '123456'), 1) AS m")
val e = intercept[SparkException](inputDF.writeTo("t").append())
checkError(
exception = e.getCause match {
case c: SparkRuntimeException => c
case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
}
}
}
test("SPARK-42611: check char/varchar length in reordered structs within map values") {
Seq("CHAR(5)", "VARCHAR(5)").foreach { typ =>
withTable("t") {
sql(s"CREATE TABLE t(m MAP<INT, STRUCT<n_c: $typ, n_i: INT>>) USING $format")
val inputDF = sql("SELECT map(1, named_struct('n_i', 1, 'n_c', '123456')) AS m")
val e = intercept[SparkException](inputDF.writeTo("t").append())
checkError(
exception = e.getCause match {
case c: SparkRuntimeException => c
case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
},
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "5")
)
}
}
}
}