blob: 298c07059ff44f4df354c38b85f68415bdcd0375 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connector
import java.sql.Timestamp
import java.time.LocalDate
import scala.collection.JavaConverters._
import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode, V2_SESSION_CATALOG_IMPLEMENTATION}
import org.apache.spark.sql.internal.connector.SimpleTableProvider
import org.apache.spark.sql.sources.SimpleScanSource
import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils
class DataSourceV2SQLSuite
extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests = true)
with AlterTableTests {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
private val v2Source = classOf[FakeV2Provider].getName
override protected val v2Format = v2Source
override protected val catalogAndNamespace = "testcat.ns1.ns2."
private val defaultUser: String = Utils.getCurrentUserName()
private def catalog(name: String): CatalogPlugin = {
spark.sessionState.catalogManager.catalog(name)
}
protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = {
val tmpView = "tmp_view"
withTempView(tmpView) {
insert.createOrReplaceTempView(tmpView)
val overwrite = if (mode == SaveMode.Overwrite) "OVERWRITE" else "INTO"
sql(s"INSERT $overwrite TABLE $tableName SELECT * FROM $tmpView")
}
}
override def verifyTable(tableName: String, expected: DataFrame): Unit = {
checkAnswer(spark.table(tableName), expected)
}
override def getTableMetadata(tableName: String): Table = {
val nameParts = spark.sessionState.sqlParser.parseMultipartIdentifier(tableName)
val v2Catalog = catalog(nameParts.head).asTableCatalog
val namespace = nameParts.drop(1).init.toArray
v2Catalog.loadTable(Identifier.of(namespace, nameParts.last))
}
before {
spark.conf.set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName)
spark.conf.set(
"spark.sql.catalog.testcat_atomic", classOf[StagingInMemoryTableCatalog].getName)
spark.conf.set("spark.sql.catalog.testcat2", classOf[InMemoryTableCatalog].getName)
spark.conf.set(
V2_SESSION_CATALOG_IMPLEMENTATION.key, classOf[InMemoryTableSessionCatalog].getName)
val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data")
df.createOrReplaceTempView("source")
val df2 = spark.createDataFrame(Seq((4L, "d"), (5L, "e"), (6L, "f"))).toDF("id", "data")
df2.createOrReplaceTempView("source2")
}
after {
spark.sessionState.catalog.reset()
spark.sessionState.catalogManager.reset()
spark.sessionState.conf.clear()
}
test("CreateTable: use v2 plan because catalog is set") {
spark.sql("CREATE TABLE testcat.table_name (id bigint NOT NULL, data string) USING foo")
val testCatalog = catalog("testcat").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.name == "testcat.table_name")
assert(table.partitioning.isEmpty)
assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table.schema == new StructType()
.add("id", LongType, nullable = false)
.add("data", StringType))
val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty)
}
test("DescribeTable using v2 catalog") {
spark.sql("CREATE TABLE testcat.table_name (id bigint, data string)" +
" USING foo" +
" PARTITIONED BY (id)")
val descriptionDf = spark.sql("DESCRIBE TABLE testcat.table_name")
assert(descriptionDf.schema.map(field => (field.name, field.dataType)) ===
Seq(
("col_name", StringType),
("data_type", StringType),
("comment", StringType)))
val description = descriptionDf.collect()
assert(description === Seq(
Row("id", "bigint", ""),
Row("data", "string", ""),
Row("", "", ""),
Row("# Partitioning", "", ""),
Row("Part 0", "id", "")))
val e = intercept[AnalysisException] {
sql("DESCRIBE TABLE testcat.table_name PARTITION (id = 1)")
}
assert(e.message.contains("DESCRIBE does not support partition for v2 tables"))
}
test("DescribeTable with v2 catalog when table does not exist.") {
intercept[AnalysisException] {
spark.sql("DESCRIBE TABLE testcat.table_name")
}
}
test("DescribeTable extended using v2 catalog") {
spark.sql("CREATE TABLE testcat.table_name (id bigint, data string)" +
" USING foo" +
" PARTITIONED BY (id)" +
" TBLPROPERTIES ('bar'='baz')" +
" COMMENT 'this is a test table'" +
" LOCATION '/tmp/testcat/table_name'")
val descriptionDf = spark.sql("DESCRIBE TABLE EXTENDED testcat.table_name")
assert(descriptionDf.schema.map(field => (field.name, field.dataType))
=== Seq(
("col_name", StringType),
("data_type", StringType),
("comment", StringType)))
assert(descriptionDf.collect()
.map(_.toSeq)
.map(_.toArray.map(_.toString.trim)) === Array(
Array("id", "bigint", ""),
Array("data", "string", ""),
Array("", "", ""),
Array("# Partitioning", "", ""),
Array("Part 0", "id", ""),
Array("", "", ""),
Array("# Detailed Table Information", "", ""),
Array("Name", "testcat.table_name", ""),
Array("Comment", "this is a test table", ""),
Array("Location", "/tmp/testcat/table_name", ""),
Array("Provider", "foo", ""),
Array(TableCatalog.PROP_OWNER.capitalize, defaultUser, ""),
Array("Table Properties", "[bar=baz]", "")))
}
test("Describe column is not supported for v2 catalog") {
withTable("testcat.tbl") {
spark.sql("CREATE TABLE testcat.tbl (id bigint) USING foo")
val ex = intercept[AnalysisException] {
spark.sql("DESCRIBE testcat.tbl id")
}
assert(ex.message.contains("Describing columns is not supported for v2 tables"))
}
}
test("SPARK-33004: Describe column should resolve to a temporary view first") {
withTable("testcat.ns.t") {
withTempView("t") {
sql("CREATE TABLE testcat.ns.t (id bigint) USING foo")
sql("CREATE TEMPORARY VIEW t AS SELECT 2 as i")
sql("USE testcat.ns")
checkAnswer(
sql("DESCRIBE t i"),
Seq(Row("col_name", "i"),
Row("data_type", "int"),
Row("comment", "NULL")))
}
}
}
test("CreateTable: use v2 plan and session catalog when provider is v2") {
spark.sql(s"CREATE TABLE table_name (id bigint, data string) USING $v2Source")
val testCatalog = catalog(SESSION_CATALOG_NAME).asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array("default"), "table_name"))
assert(table.name == "default.table_name")
assert(table.partitioning.isEmpty)
assert(table.properties == withDefaultOwnership(Map("provider" -> v2Source)).asJava)
assert(table.schema == new StructType().add("id", LongType).add("data", StringType))
val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty)
}
test("CreateTable: fail if table exists") {
spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo")
val testCatalog = catalog("testcat").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.name == "testcat.table_name")
assert(table.partitioning.isEmpty)
assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table.schema == new StructType().add("id", LongType).add("data", StringType))
// run a second create query that should fail
val exc = intercept[TableAlreadyExistsException] {
spark.sql("CREATE TABLE testcat.table_name (id bigint, data string, id2 bigint) USING bar")
}
assert(exc.getMessage.contains("table_name"))
// table should not have changed
val table2 = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table2.name == "testcat.table_name")
assert(table2.partitioning.isEmpty)
assert(table2.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table2.schema == new StructType().add("id", LongType).add("data", StringType))
// check that the table is still empty
val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty)
}
test("CreateTable: if not exists") {
spark.sql(
"CREATE TABLE IF NOT EXISTS testcat.table_name (id bigint, data string) USING foo")
val testCatalog = catalog("testcat").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.name == "testcat.table_name")
assert(table.partitioning.isEmpty)
assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table.schema == new StructType().add("id", LongType).add("data", StringType))
spark.sql("CREATE TABLE IF NOT EXISTS testcat.table_name (id bigint, data string) USING bar")
// table should not have changed
val table2 = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table2.name == "testcat.table_name")
assert(table2.partitioning.isEmpty)
assert(table2.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table2.schema == new StructType().add("id", LongType).add("data", StringType))
// check that the table is still empty
val rdd2 = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
checkAnswer(spark.internalCreateDataFrame(rdd2, table.schema), Seq.empty)
}
test("CreateTable: use default catalog for v2 sources when default catalog is set") {
spark.conf.set(SQLConf.DEFAULT_CATALOG.key, "testcat")
spark.sql(s"CREATE TABLE table_name (id bigint, data string) USING foo")
val testCatalog = catalog("testcat").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.name == "testcat.table_name")
assert(table.partitioning.isEmpty)
assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table.schema == new StructType().add("id", LongType).add("data", StringType))
// check that the table is empty
val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty)
}
// TODO: ignored by SPARK-31707, restore the test after create table syntax unification
ignore("CreateTable: without USING clause") {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
val testCatalog = catalog("testcat").asTableCatalog
sql("CREATE TABLE testcat.t1 (id int)")
val t1 = testCatalog.loadTable(Identifier.of(Array(), "t1"))
// Spark shouldn't set the default provider for catalog plugins.
assert(!t1.properties.containsKey(TableCatalog.PROP_PROVIDER))
sql("CREATE TABLE t2 (id int)")
val t2 = spark.sessionState.catalogManager.v2SessionCatalog.asTableCatalog
.loadTable(Identifier.of(Array("default"), "t2")).asInstanceOf[V1Table]
// Spark should set the default provider as DEFAULT_DATA_SOURCE_NAME for the session catalog.
assert(t2.v1Table.provider == Some(conf.defaultDataSourceName))
}
test("CreateTable/RepalceTable: invalid schema if has interval type") {
Seq("CREATE", "REPLACE").foreach { action =>
val e1 = intercept[AnalysisException](
sql(s"$action TABLE table_name (id int, value interval) USING $v2Format"))
assert(e1.getMessage.contains(s"Cannot use interval type in the table schema."))
val e2 = intercept[AnalysisException](
sql(s"$action TABLE table_name (id array<interval>) USING $v2Format"))
assert(e2.getMessage.contains(s"Cannot use interval type in the table schema."))
}
}
test("CTAS/RTAS: invalid schema if has interval type") {
Seq("CREATE", "REPLACE").foreach { action =>
val e1 = intercept[AnalysisException](
sql(s"$action TABLE table_name USING $v2Format as select interval 1 day"))
assert(e1.getMessage.contains(s"Cannot use interval type in the table schema."))
val e2 = intercept[AnalysisException](
sql(s"$action TABLE table_name USING $v2Format as select array(interval 1 day)"))
assert(e2.getMessage.contains(s"Cannot use interval type in the table schema."))
}
}
test("CreateTableAsSelect: use v2 plan because catalog is set") {
val basicCatalog = catalog("testcat").asTableCatalog
val atomicCatalog = catalog("testcat_atomic").asTableCatalog
val basicIdentifier = "testcat.table_name"
val atomicIdentifier = "testcat_atomic.table_name"
Seq((basicCatalog, basicIdentifier), (atomicCatalog, atomicIdentifier)).foreach {
case (catalog, identifier) =>
spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT id, data FROM source")
val table = catalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.name == identifier)
assert(table.partitioning.isEmpty)
assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table.schema == new StructType()
.add("id", LongType)
.add("data", StringType))
val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source"))
}
}
test("CreateTableAsSelect: do not double execute on collect(), take() and other queries") {
val basicCatalog = catalog("testcat").asTableCatalog
val atomicCatalog = catalog("testcat_atomic").asTableCatalog
val basicIdentifier = "testcat.table_name"
val atomicIdentifier = "testcat_atomic.table_name"
Seq((basicCatalog, basicIdentifier), (atomicCatalog, atomicIdentifier)).foreach {
case (catalog, identifier) =>
val df = spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT id, data FROM source")
df.collect()
df.take(5)
df.tail(5)
df.where("true").collect()
df.where("true").take(5)
df.where("true").tail(5)
val table = catalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.name == identifier)
assert(table.partitioning.isEmpty)
assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table.schema == new StructType()
.add("id", LongType)
.add("data", StringType))
val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source"))
}
}
test("ReplaceTableAsSelect: basic v2 implementation.") {
val basicCatalog = catalog("testcat").asTableCatalog
val atomicCatalog = catalog("testcat_atomic").asTableCatalog
val basicIdentifier = "testcat.table_name"
val atomicIdentifier = "testcat_atomic.table_name"
Seq((basicCatalog, basicIdentifier), (atomicCatalog, atomicIdentifier)).foreach {
case (catalog, identifier) =>
spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT id, data FROM source")
val originalTable = catalog.loadTable(Identifier.of(Array(), "table_name"))
spark.sql(s"REPLACE TABLE $identifier USING foo AS SELECT id FROM source")
val replacedTable = catalog.loadTable(Identifier.of(Array(), "table_name"))
assert(replacedTable != originalTable, "Table should have been replaced.")
assert(replacedTable.name == identifier)
assert(replacedTable.partitioning.isEmpty)
assert(replacedTable.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(replacedTable.schema == new StructType().add("id", LongType))
val rdd = spark.sparkContext.parallelize(replacedTable.asInstanceOf[InMemoryTable].rows)
checkAnswer(
spark.internalCreateDataFrame(rdd, replacedTable.schema),
spark.table("source").select("id"))
}
}
Seq("REPLACE", "CREATE OR REPLACE").foreach { cmd =>
test(s"ReplaceTableAsSelect: do not double execute $cmd on collect()") {
val basicCatalog = catalog("testcat").asTableCatalog
val atomicCatalog = catalog("testcat_atomic").asTableCatalog
val basicIdentifier = "testcat.table_name"
val atomicIdentifier = "testcat_atomic.table_name"
Seq((basicCatalog, basicIdentifier), (atomicCatalog, atomicIdentifier)).foreach {
case (catalog, identifier) =>
spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT id, data FROM source")
val originalTable = catalog.loadTable(Identifier.of(Array(), "table_name"))
val df = spark.sql(s"$cmd TABLE $identifier USING foo AS SELECT id FROM source")
df.collect()
df.take(5)
df.tail(5)
df.where("true").collect()
df.where("true").take(5)
df.where("true").tail(5)
val replacedTable = catalog.loadTable(Identifier.of(Array(), "table_name"))
assert(replacedTable != originalTable, "Table should have been replaced.")
assert(replacedTable.name == identifier)
assert(replacedTable.partitioning.isEmpty)
assert(replacedTable.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(replacedTable.schema == new StructType().add("id", LongType))
val rdd = spark.sparkContext.parallelize(replacedTable.asInstanceOf[InMemoryTable].rows)
checkAnswer(
spark.internalCreateDataFrame(rdd, replacedTable.schema),
spark.table("source").select("id"))
}
}
}
test("ReplaceTableAsSelect: Non-atomic catalog drops the table if the write fails.") {
spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data FROM source")
val testCatalog = catalog("testcat").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
intercept[Exception] {
spark.sql("REPLACE TABLE testcat.table_name" +
s" USING foo OPTIONS (`${InMemoryTable.SIMULATE_FAILED_WRITE_OPTION}`=true)" +
s" AS SELECT id FROM source")
}
assert(!testCatalog.tableExists(Identifier.of(Array(), "table_name")),
"Table should have been dropped as a result of the replace.")
}
test("ReplaceTableAsSelect: Non-atomic catalog drops the table permanently if the" +
" subsequent table creation fails.") {
spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data FROM source")
val testCatalog = catalog("testcat").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
intercept[Exception] {
spark.sql("REPLACE TABLE testcat.table_name" +
s" USING foo" +
s" TBLPROPERTIES (`${InMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY}`=true)" +
s" AS SELECT id FROM source")
}
assert(!testCatalog.tableExists(Identifier.of(Array(), "table_name")),
"Table should have been dropped and failed to be created.")
}
test("ReplaceTableAsSelect: Atomic catalog does not drop the table when replace fails.") {
spark.sql("CREATE TABLE testcat_atomic.table_name USING foo AS SELECT id, data FROM source")
val testCatalog = catalog("testcat_atomic").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
intercept[Exception] {
spark.sql("REPLACE TABLE testcat_atomic.table_name" +
s" USING foo OPTIONS (`${InMemoryTable.SIMULATE_FAILED_WRITE_OPTION}=true)" +
s" AS SELECT id FROM source")
}
var maybeReplacedTable = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(maybeReplacedTable === table, "Table should not have changed.")
intercept[Exception] {
spark.sql("REPLACE TABLE testcat_atomic.table_name" +
s" USING foo" +
s" TBLPROPERTIES (`${InMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY}`=true)" +
s" AS SELECT id FROM source")
}
maybeReplacedTable = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(maybeReplacedTable === table, "Table should not have changed.")
}
test("ReplaceTable: Erases the table contents and changes the metadata.") {
spark.sql(s"CREATE TABLE testcat.table_name USING $v2Source AS SELECT id, data FROM source")
val testCatalog = catalog("testcat").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
spark.sql("REPLACE TABLE testcat.table_name (id bigint NOT NULL) USING foo")
val replaced = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(replaced.asInstanceOf[InMemoryTable].rows.isEmpty,
"Replaced table should have no rows after committing.")
assert(replaced.schema().fields.length === 1,
"Replaced table should have new schema.")
assert(replaced.schema().fields(0) === StructField("id", LongType, nullable = false),
"Replaced table should have new schema.")
}
test("ReplaceTableAsSelect: CREATE OR REPLACE new table has same behavior as CTAS.") {
Seq("testcat", "testcat_atomic").foreach { catalogName =>
spark.sql(
s"""
|CREATE TABLE $catalogName.created USING $v2Source
|AS SELECT id, data FROM source
""".stripMargin)
spark.sql(
s"""
|CREATE OR REPLACE TABLE $catalogName.replaced USING $v2Source
|AS SELECT id, data FROM source
""".stripMargin)
val testCatalog = catalog(catalogName).asTableCatalog
val createdTable = testCatalog.loadTable(Identifier.of(Array(), "created"))
val replacedTable = testCatalog.loadTable(Identifier.of(Array(), "replaced"))
assert(createdTable.asInstanceOf[InMemoryTable].rows ===
replacedTable.asInstanceOf[InMemoryTable].rows)
assert(createdTable.schema === replacedTable.schema)
}
}
test("ReplaceTableAsSelect: REPLACE TABLE throws exception if table does not exist.") {
Seq("testcat", "testcat_atomic").foreach { catalog =>
spark.sql(s"CREATE TABLE $catalog.created USING $v2Source AS SELECT id, data FROM source")
intercept[CannotReplaceMissingTableException] {
spark.sql(s"REPLACE TABLE $catalog.replaced USING $v2Source AS SELECT id, data FROM source")
}
}
}
test("ReplaceTableAsSelect: REPLACE TABLE throws exception if table is dropped before commit.") {
import InMemoryTableCatalog._
spark.sql(s"CREATE TABLE testcat_atomic.created USING $v2Source AS SELECT id, data FROM source")
intercept[CannotReplaceMissingTableException] {
spark.sql(s"REPLACE TABLE testcat_atomic.replaced" +
s" USING $v2Source" +
s" TBLPROPERTIES (`$SIMULATE_DROP_BEFORE_REPLACE_PROPERTY`=true)" +
s" AS SELECT id, data FROM source")
}
}
test("CreateTableAsSelect: use v2 plan and session catalog when provider is v2") {
spark.sql(s"CREATE TABLE table_name USING $v2Source AS SELECT id, data FROM source")
val testCatalog = catalog(SESSION_CATALOG_NAME).asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array("default"), "table_name"))
assert(table.name == "default.table_name")
assert(table.partitioning.isEmpty)
assert(table.properties == withDefaultOwnership(Map("provider" -> v2Source)).asJava)
assert(table.schema == new StructType()
.add("id", LongType)
.add("data", StringType))
val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source"))
}
test("CreateTableAsSelect: fail if table exists") {
spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data FROM source")
val testCatalog = catalog("testcat").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.name == "testcat.table_name")
assert(table.partitioning.isEmpty)
assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table.schema == new StructType()
.add("id", LongType)
.add("data", StringType))
val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source"))
// run a second CTAS query that should fail
val exc = intercept[TableAlreadyExistsException] {
spark.sql(
"CREATE TABLE testcat.table_name USING bar AS SELECT id, data, id as id2 FROM source2")
}
assert(exc.getMessage.contains("table_name"))
// table should not have changed
val table2 = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table2.name == "testcat.table_name")
assert(table2.partitioning.isEmpty)
assert(table2.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table2.schema == new StructType()
.add("id", LongType)
.add("data", StringType))
val rdd2 = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
checkAnswer(spark.internalCreateDataFrame(rdd2, table.schema), spark.table("source"))
}
test("CreateTableAsSelect: if not exists") {
spark.sql(
"CREATE TABLE IF NOT EXISTS testcat.table_name USING foo AS SELECT id, data FROM source")
val testCatalog = catalog("testcat").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.name == "testcat.table_name")
assert(table.partitioning.isEmpty)
assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table.schema == new StructType()
.add("id", LongType)
.add("data", StringType))
val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source"))
spark.sql(
"CREATE TABLE IF NOT EXISTS testcat.table_name USING foo AS SELECT id, data FROM source2")
// check that the table contains data from just the first CTAS
val rdd2 = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
checkAnswer(spark.internalCreateDataFrame(rdd2, table.schema), spark.table("source"))
}
test("CreateTableAsSelect: use default catalog for v2 sources when default catalog is set") {
spark.conf.set(SQLConf.DEFAULT_CATALOG.key, "testcat")
val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data")
df.createOrReplaceTempView("source")
// setting the default catalog breaks the reference to source because the default catalog is
// used and AsTableIdentifier no longer matches
spark.sql(s"CREATE TABLE table_name USING foo AS SELECT id, data FROM source")
val testCatalog = catalog("testcat").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.name == "testcat.table_name")
assert(table.partitioning.isEmpty)
assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table.schema == new StructType()
.add("id", LongType)
.add("data", StringType))
val rdd = sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source"))
}
test("CreateTableAsSelect: v2 session catalog can load v1 source table") {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data")
df.createOrReplaceTempView("source")
sql(s"CREATE TABLE table_name USING parquet AS SELECT id, data FROM source")
checkAnswer(sql(s"TABLE default.table_name"), spark.table("source"))
// The fact that the following line doesn't throw an exception means, the session catalog
// can load the table.
val t = catalog(SESSION_CATALOG_NAME).asTableCatalog
.loadTable(Identifier.of(Array("default"), "table_name"))
assert(t.isInstanceOf[V1Table], "V1 table wasn't returned as an unresolved table")
}
test("CreateTableAsSelect: nullable schema") {
val basicCatalog = catalog("testcat").asTableCatalog
val atomicCatalog = catalog("testcat_atomic").asTableCatalog
val basicIdentifier = "testcat.table_name"
val atomicIdentifier = "testcat_atomic.table_name"
Seq((basicCatalog, basicIdentifier), (atomicCatalog, atomicIdentifier)).foreach {
case (catalog, identifier) =>
spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT 1 i")
val table = catalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.name == identifier)
assert(table.partitioning.isEmpty)
assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
assert(table.schema == new StructType().add("i", "int"))
val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Row(1))
sql(s"INSERT INTO $identifier SELECT CAST(null AS INT)")
val rdd2 = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
checkAnswer(spark.internalCreateDataFrame(rdd2, table.schema), Seq(Row(1), Row(null)))
}
}
// TODO: ignored by SPARK-31707, restore the test after create table syntax unification
ignore("CreateTableAsSelect: without USING clause") {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
val testCatalog = catalog("testcat").asTableCatalog
sql("CREATE TABLE testcat.t1 AS SELECT 1 i")
val t1 = testCatalog.loadTable(Identifier.of(Array(), "t1"))
// Spark shouldn't set the default provider for catalog plugins.
assert(!t1.properties.containsKey(TableCatalog.PROP_PROVIDER))
sql("CREATE TABLE t2 AS SELECT 1 i")
val t2 = spark.sessionState.catalogManager.v2SessionCatalog.asTableCatalog
.loadTable(Identifier.of(Array("default"), "t2")).asInstanceOf[V1Table]
// Spark should set the default provider as DEFAULT_DATA_SOURCE_NAME for the session catalog.
assert(t2.v1Table.provider == Some(conf.defaultDataSourceName))
}
test("DropTable: basic") {
val tableName = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
sql(s"CREATE TABLE $tableName USING foo AS SELECT id, data FROM source")
assert(catalog("testcat").asTableCatalog.tableExists(ident) === true)
sql(s"DROP TABLE $tableName")
assert(catalog("testcat").asTableCatalog.tableExists(ident) === false)
}
test("DropTable: table qualified with the session catalog name") {
val ident = Identifier.of(Array("default"), "tbl")
sql("CREATE TABLE tbl USING json AS SELECT 1 AS i")
assert(catalog("spark_catalog").asTableCatalog.tableExists(ident) === true)
sql("DROP TABLE spark_catalog.default.tbl")
assert(catalog("spark_catalog").asTableCatalog.tableExists(ident) === false)
}
test("DropTable: if exists") {
val ex = intercept[AnalysisException] {
sql("DROP TABLE testcat.db.notbl")
}
assert(ex.getMessage.contains("Table or view not found: testcat.db.notbl"))
sql("DROP TABLE IF EXISTS testcat.db.notbl")
}
test("SPARK-33174: DROP TABLE should resolve to a temporary view first") {
withTable("testcat.ns.t") {
withTempView("t") {
sql("CREATE TABLE testcat.ns.t (id bigint) USING foo")
sql("CREATE TEMPORARY VIEW t AS SELECT 2")
sql("USE testcat.ns")
// Check the temporary view 't' exists.
runShowTablesSql(
"SHOW TABLES FROM spark_catalog.default LIKE 't'",
Seq(Row("", "t", true)),
expectV2Catalog = false)
sql("DROP TABLE t")
// Verify that the temporary view 't' is resolved first and dropped.
runShowTablesSql(
"SHOW TABLES FROM spark_catalog.default LIKE 't'",
Nil,
expectV2Catalog = false)
}
}
}
test("Relation: basic") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source")
checkAnswer(sql(s"TABLE $t1"), spark.table("source"))
checkAnswer(sql(s"SELECT * FROM $t1"), spark.table("source"))
}
}
test("Relation: SparkSession.table()") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source")
checkAnswer(spark.table(s"$t1"), spark.table("source"))
}
}
test("Relation: CTE") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source")
checkAnswer(
sql(s"""
|WITH cte AS (SELECT * FROM $t1)
|SELECT * FROM cte
""".stripMargin),
spark.table("source"))
}
}
test("Relation: view text") {
val t1 = "testcat.ns1.ns2.tbl"
val v1 = "view1"
withTable(t1) {
withView(v1) {
sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source")
sql(s"CREATE VIEW $v1 AS SELECT * from $t1")
checkAnswer(sql(s"TABLE $v1"), spark.table("source"))
}
}
}
test("Relation: join tables in 2 catalogs") {
val t1 = "testcat.ns1.ns2.tbl"
val t2 = "testcat2.v2tbl"
withTable(t1, t2) {
sql(s"CREATE TABLE $t1 USING foo AS SELECT id, data FROM source")
sql(s"CREATE TABLE $t2 USING foo AS SELECT id, data FROM source2")
val df1 = spark.table("source")
val df2 = spark.table("source2")
val df_joined = df1.join(df2).where(df1("id") + 1 === df2("id"))
checkAnswer(
sql(s"""
|SELECT *
|FROM $t1 t1, $t2 t2
|WHERE t1.id + 1 = t2.id
""".stripMargin),
df_joined)
}
}
test("qualified column names for v2 tables") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, point struct<x: bigint, y: bigint>) USING foo")
sql(s"INSERT INTO $t VALUES (1, (10, 20))")
def check(tbl: String): Unit = {
checkAnswer(
sql(s"SELECT testcat.ns1.ns2.tbl.id, testcat.ns1.ns2.tbl.point.x FROM $tbl"),
Row(1, 10))
checkAnswer(sql(s"SELECT ns1.ns2.tbl.id, ns1.ns2.tbl.point.x FROM $tbl"), Row(1, 10))
checkAnswer(sql(s"SELECT ns2.tbl.id, ns2.tbl.point.x FROM $tbl"), Row(1, 10))
checkAnswer(sql(s"SELECT tbl.id, tbl.point.x FROM $tbl"), Row(1, 10))
}
// Test with qualified table name "testcat.ns1.ns2.tbl".
check(t)
// Test if current catalog and namespace is respected in column resolution.
sql("USE testcat.ns1.ns2")
check("tbl")
val ex = intercept[AnalysisException] {
sql(s"SELECT ns1.ns2.ns3.tbl.id from $t")
}
assert(ex.getMessage.contains("cannot resolve '`ns1.ns2.ns3.tbl.id`"))
}
}
test("qualified column names for v1 tables") {
Seq(true, false).foreach { useV1Table =>
val format = if (useV1Table) "json" else v2Format
if (useV1Table) {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
} else {
spark.conf.set(
V2_SESSION_CATALOG_IMPLEMENTATION.key, classOf[InMemoryTableSessionCatalog].getName)
}
withTable("t") {
sql(s"CREATE TABLE t USING $format AS SELECT 1 AS i")
checkAnswer(sql("select i from t"), Row(1))
checkAnswer(sql("select t.i from t"), Row(1))
checkAnswer(sql("select default.t.i from t"), Row(1))
checkAnswer(sql("select spark_catalog.default.t.i from t"), Row(1))
checkAnswer(sql("select t.i from spark_catalog.default.t"), Row(1))
checkAnswer(sql("select default.t.i from spark_catalog.default.t"), Row(1))
checkAnswer(sql("select spark_catalog.default.t.i from spark_catalog.default.t"), Row(1))
}
}
}
test("InsertInto: append - across catalog") {
val t1 = "testcat.ns1.ns2.tbl"
val t2 = "testcat2.db.tbl"
withTable(t1, t2) {
sql(s"CREATE TABLE $t1 USING foo AS SELECT * FROM source")
sql(s"CREATE TABLE $t2 (id bigint, data string) USING foo")
sql(s"INSERT INTO $t2 SELECT * FROM $t1")
checkAnswer(spark.table(t2), spark.table("source"))
}
}
test("ShowTables: using v2 catalog") {
spark.sql("CREATE TABLE testcat.db.table_name (id bigint, data string) USING foo")
spark.sql("CREATE TABLE testcat.n1.n2.db.table_name (id bigint, data string) USING foo")
runShowTablesSql("SHOW TABLES FROM testcat.db", Seq(Row("db", "table_name")))
runShowTablesSql(
"SHOW TABLES FROM testcat.n1.n2.db",
Seq(Row("n1.n2.db", "table_name")))
}
test("ShowTables: using v2 catalog with a pattern") {
spark.sql("CREATE TABLE testcat.db.table (id bigint, data string) USING foo")
spark.sql("CREATE TABLE testcat.db.table_name_1 (id bigint, data string) USING foo")
spark.sql("CREATE TABLE testcat.db.table_name_2 (id bigint, data string) USING foo")
spark.sql("CREATE TABLE testcat.db2.table_name_2 (id bigint, data string) USING foo")
runShowTablesSql(
"SHOW TABLES FROM testcat.db",
Seq(
Row("db", "table"),
Row("db", "table_name_1"),
Row("db", "table_name_2")))
runShowTablesSql(
"SHOW TABLES FROM testcat.db LIKE '*name*'",
Seq(Row("db", "table_name_1"), Row("db", "table_name_2")))
runShowTablesSql(
"SHOW TABLES FROM testcat.db LIKE '*2'",
Seq(Row("db", "table_name_2")))
}
test("ShowTables: using v2 catalog, namespace doesn't exist") {
runShowTablesSql("SHOW TABLES FROM testcat.unknown", Seq())
}
test("ShowTables: using v1 catalog") {
runShowTablesSql(
"SHOW TABLES FROM default",
Seq(Row("", "source", true), Row("", "source2", true)),
expectV2Catalog = false)
}
test("ShowTables: using v1 catalog, db doesn't exist ") {
// 'db' below resolves to a database name for v1 catalog because there is no catalog named
// 'db' and there is no default catalog set.
val exception = intercept[NoSuchDatabaseException] {
runShowTablesSql("SHOW TABLES FROM db", Seq(), expectV2Catalog = false)
}
assert(exception.getMessage.contains("Database 'db' not found"))
}
test("ShowTables: using v1 catalog, db name with multipartIdentifier ('a.b') is not allowed.") {
val exception = intercept[AnalysisException] {
runShowTablesSql("SHOW TABLES FROM a.b", Seq(), expectV2Catalog = false)
}
assert(exception.getMessage.contains("The database name is not valid: a.b"))
}
test("ShowViews: using v1 catalog, db name with multipartIdentifier ('a.b') is not allowed.") {
val exception = intercept[AnalysisException] {
sql("SHOW TABLES FROM a.b")
}
assert(exception.getMessage.contains("The database name is not valid: a.b"))
}
test("ShowViews: using v2 catalog, command not supported.") {
val exception = intercept[AnalysisException] {
sql("SHOW VIEWS FROM testcat")
}
assert(exception.getMessage.contains("Catalog testcat doesn't support SHOW VIEWS," +
" only SessionCatalog supports this command."))
}
test("ShowTables: using v2 catalog with empty namespace") {
spark.sql("CREATE TABLE testcat.table (id bigint, data string) USING foo")
runShowTablesSql("SHOW TABLES FROM testcat", Seq(Row("", "table")))
}
test("ShowTables: namespace is not specified and default v2 catalog is set") {
spark.conf.set(SQLConf.DEFAULT_CATALOG.key, "testcat")
spark.sql("CREATE TABLE testcat.table (id bigint, data string) USING foo")
// v2 catalog is used where default namespace is empty for TestInMemoryTableCatalog.
runShowTablesSql("SHOW TABLES", Seq(Row("", "table")))
}
test("ShowTables: namespace not specified and default v2 catalog not set - fallback to v1") {
runShowTablesSql(
"SHOW TABLES",
Seq(Row("", "source", true), Row("", "source2", true)),
expectV2Catalog = false)
runShowTablesSql(
"SHOW TABLES LIKE '*2'",
Seq(Row("", "source2", true)),
expectV2Catalog = false)
}
test("ShowTables: change current catalog and namespace with USE statements") {
sql("CREATE TABLE testcat.ns1.ns2.table (id bigint) USING foo")
// Initially, the v2 session catalog (current catalog) is used.
runShowTablesSql(
"SHOW TABLES", Seq(Row("", "source", true), Row("", "source2", true)),
expectV2Catalog = false)
// Update the current catalog, and no table is matched since the current namespace is Array().
sql("USE testcat")
runShowTablesSql("SHOW TABLES", Seq())
// Update the current namespace to match ns1.ns2.table.
sql("USE testcat.ns1.ns2")
runShowTablesSql("SHOW TABLES", Seq(Row("ns1.ns2", "table")))
}
private def runShowTablesSql(
sqlText: String,
expected: Seq[Row],
expectV2Catalog: Boolean = true): Unit = {
val schema = if (expectV2Catalog) {
new StructType()
.add("namespace", StringType, nullable = false)
.add("tableName", StringType, nullable = false)
} else {
new StructType()
.add("database", StringType, nullable = false)
.add("tableName", StringType, nullable = false)
.add("isTemporary", BooleanType, nullable = false)
}
val df = spark.sql(sqlText)
assert(df.schema === schema)
assert(expected === df.collect())
}
test("SHOW TABLE EXTENDED not valid v1 database") {
def testV1CommandNamespace(sqlCommand: String, namespace: String): Unit = {
val e = intercept[AnalysisException] {
sql(sqlCommand)
}
assert(e.message.contains(s"The database name is not valid: ${namespace}"))
}
val namespace = "testcat.ns1.ns2"
val table = "tbl"
withTable(s"$namespace.$table") {
sql(s"CREATE TABLE $namespace.$table (id bigint, data string) " +
s"USING foo PARTITIONED BY (id)")
testV1CommandNamespace(s"SHOW TABLE EXTENDED FROM $namespace LIKE 'tb*'",
namespace)
testV1CommandNamespace(s"SHOW TABLE EXTENDED IN $namespace LIKE 'tb*'",
namespace)
testV1CommandNamespace("SHOW TABLE EXTENDED " +
s"FROM $namespace LIKE 'tb*' PARTITION(id=1)",
namespace)
testV1CommandNamespace("SHOW TABLE EXTENDED " +
s"IN $namespace LIKE 'tb*' PARTITION(id=1)",
namespace)
}
}
test("SHOW TABLE EXTENDED valid v1") {
val expected = Seq(Row("", "source", true), Row("", "source2", true))
val schema = new StructType()
.add("database", StringType, nullable = false)
.add("tableName", StringType, nullable = false)
.add("isTemporary", BooleanType, nullable = false)
.add("information", StringType, nullable = false)
val df = sql("SHOW TABLE EXTENDED FROM default LIKE '*source*'")
val result = df.collect()
val resultWithoutInfo = result.map{ case Row(db, table, temp, _) => Row(db, table, temp)}
assert(df.schema === schema)
assert(resultWithoutInfo === expected)
result.foreach{ case Row(_, _, _, info: String) => assert(info.nonEmpty)}
}
test("CreateNameSpace: basic tests") {
// Session catalog is used.
withNamespace("ns") {
sql("CREATE NAMESPACE ns")
testShowNamespaces("SHOW NAMESPACES", Seq("default", "ns"))
}
// V2 non-session catalog is used.
withNamespace("testcat.ns1.ns2") {
sql("CREATE NAMESPACE testcat.ns1.ns2")
testShowNamespaces("SHOW NAMESPACES IN testcat", Seq("ns1"))
testShowNamespaces("SHOW NAMESPACES IN testcat.ns1", Seq("ns1.ns2"))
}
withNamespace("testcat.test") {
withTempDir { tmpDir =>
val path = tmpDir.getCanonicalPath
sql(s"CREATE NAMESPACE testcat.test LOCATION '$path'")
val metadata =
catalog("testcat").asNamespaceCatalog.loadNamespaceMetadata(Array("test")).asScala
val catalogPath = metadata(SupportsNamespaces.PROP_LOCATION)
assert(catalogPath.equals(catalogPath))
}
}
}
test("CreateNameSpace: test handling of 'IF NOT EXIST'") {
withNamespace("testcat.ns1") {
sql("CREATE NAMESPACE IF NOT EXISTS testcat.ns1")
// The 'ns1' namespace already exists, so this should fail.
val exception = intercept[NamespaceAlreadyExistsException] {
sql("CREATE NAMESPACE testcat.ns1")
}
assert(exception.getMessage.contains("Namespace 'ns1' already exists"))
// The following will be no-op since the namespace already exists.
sql("CREATE NAMESPACE IF NOT EXISTS testcat.ns1")
}
}
test("CreateNameSpace: reserved properties") {
import SupportsNamespaces._
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) {
CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key =>
val exception = intercept[ParseException] {
sql(s"CREATE NAMESPACE testcat.reservedTest WITH DBPROPERTIES('$key'='dummyVal')")
}
assert(exception.getMessage.contains(s"$key is a reserved namespace property"))
}
}
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) {
CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key =>
withNamespace("testcat.reservedTest") {
sql(s"CREATE NAMESPACE testcat.reservedTest WITH DBPROPERTIES('$key'='foo')")
assert(sql("DESC NAMESPACE EXTENDED testcat.reservedTest")
.toDF("k", "v")
.where("k='Properties'")
.isEmpty, s"$key is a reserved namespace property and ignored")
val meta =
catalog("testcat").asNamespaceCatalog.loadNamespaceMetadata(Array("reservedTest"))
assert(meta.get(key) == null || !meta.get(key).contains("foo"),
"reserved properties should not have side effects")
}
}
}
}
test("create/replace/alter table - reserved properties") {
import TableCatalog._
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) {
CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key =>
Seq("OPTIONS", "TBLPROPERTIES").foreach { clause =>
Seq("CREATE", "REPLACE").foreach { action =>
val e = intercept[ParseException] {
sql(s"$action TABLE testcat.reservedTest (key int) USING foo $clause ('$key'='bar')")
}
assert(e.getMessage.contains(s"$key is a reserved table property"))
}
}
val e1 = intercept[ParseException] {
sql(s"ALTER TABLE testcat.reservedTest SET TBLPROPERTIES ('$key'='bar')")
}
assert(e1.getMessage.contains(s"$key is a reserved table property"))
val e2 = intercept[ParseException] {
sql(s"ALTER TABLE testcat.reservedTest UNSET TBLPROPERTIES ('$key')")
}
assert(e2.getMessage.contains(s"$key is a reserved table property"))
}
}
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) {
CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key =>
Seq("OPTIONS", "TBLPROPERTIES").foreach { clause =>
withTable("testcat.reservedTest") {
Seq("CREATE", "REPLACE").foreach { action =>
sql(s"$action TABLE testcat.reservedTest (key int) USING foo $clause ('$key'='bar')")
val tableCatalog = catalog("testcat").asTableCatalog
val identifier = Identifier.of(Array(), "reservedTest")
val originValue = tableCatalog.loadTable(identifier).properties().get(key)
assert(originValue != "bar", "reserved properties should not have side effects")
sql(s"ALTER TABLE testcat.reservedTest SET TBLPROPERTIES ('$key'='newValue')")
assert(tableCatalog.loadTable(identifier).properties().get(key) == originValue,
"reserved properties should not have side effects")
sql(s"ALTER TABLE testcat.reservedTest UNSET TBLPROPERTIES ('$key')")
assert(tableCatalog.loadTable(identifier).properties().get(key) == originValue,
"reserved properties should not have side effects")
}
}
}
}
}
}
test("create/replace - path property") {
Seq("true", "false").foreach { conf =>
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, conf)) {
withTable("testcat.reservedTest") {
Seq("CREATE", "REPLACE").foreach { action =>
val e1 = intercept[ParseException] {
sql(s"$action TABLE testcat.reservedTest USING foo LOCATION 'foo' OPTIONS" +
s" ('path'='bar')")
}
assert(e1.getMessage.contains(s"Duplicated table paths found: 'foo' and 'bar'"))
val e2 = intercept[ParseException] {
sql(s"$action TABLE testcat.reservedTest USING foo OPTIONS" +
s" ('path'='foo', 'PaTh'='bar')")
}
assert(e2.getMessage.contains(s"Duplicated table paths found: 'foo' and 'bar'"))
sql(s"$action TABLE testcat.reservedTest USING foo LOCATION 'foo' TBLPROPERTIES" +
s" ('path'='bar', 'Path'='noop')")
val tableCatalog = catalog("testcat").asTableCatalog
val identifier = Identifier.of(Array(), "reservedTest")
assert(tableCatalog.loadTable(identifier).properties()
.get(TableCatalog.PROP_LOCATION) == "foo",
"path as a table property should not have side effects")
assert(tableCatalog.loadTable(identifier).properties().get("path") == "bar",
"path as a table property should not have side effects")
assert(tableCatalog.loadTable(identifier).properties().get("Path") == "noop",
"path as a table property should not have side effects")
}
}
}
}
}
test("DropNamespace: basic tests") {
// Session catalog is used.
sql("CREATE NAMESPACE ns")
testShowNamespaces("SHOW NAMESPACES", Seq("default", "ns"))
sql("DROP NAMESPACE ns")
testShowNamespaces("SHOW NAMESPACES", Seq("default"))
// V2 non-session catalog is used.
sql("CREATE NAMESPACE testcat.ns1")
testShowNamespaces("SHOW NAMESPACES IN testcat", Seq("ns1"))
sql("DROP NAMESPACE testcat.ns1")
testShowNamespaces("SHOW NAMESPACES IN testcat", Seq())
}
test("DropNamespace: drop non-empty namespace with a non-cascading mode") {
sql("CREATE TABLE testcat.ns1.table (id bigint) USING foo")
sql("CREATE TABLE testcat.ns1.ns2.table (id bigint) USING foo")
testShowNamespaces("SHOW NAMESPACES IN testcat", Seq("ns1"))
testShowNamespaces("SHOW NAMESPACES IN testcat.ns1", Seq("ns1.ns2"))
def assertDropFails(): Unit = {
val e = intercept[SparkException] {
sql("DROP NAMESPACE testcat.ns1")
}
assert(e.getMessage.contains("Cannot drop a non-empty namespace: ns1"))
}
// testcat.ns1.table is present, thus testcat.ns1 cannot be dropped.
assertDropFails()
sql("DROP TABLE testcat.ns1.table")
// testcat.ns1.ns2.table is present, thus testcat.ns1 cannot be dropped.
assertDropFails()
sql("DROP TABLE testcat.ns1.ns2.table")
// testcat.ns1.ns2 namespace is present, thus testcat.ns1 cannot be dropped.
assertDropFails()
sql("DROP NAMESPACE testcat.ns1.ns2")
// Now that testcat.ns1 is empty, it can be dropped.
sql("DROP NAMESPACE testcat.ns1")
testShowNamespaces("SHOW NAMESPACES IN testcat", Seq())
}
test("DropNamespace: drop non-empty namespace with a cascade mode") {
sql("CREATE TABLE testcat.ns1.table (id bigint) USING foo")
sql("CREATE TABLE testcat.ns1.ns2.table (id bigint) USING foo")
testShowNamespaces("SHOW NAMESPACES IN testcat", Seq("ns1"))
testShowNamespaces("SHOW NAMESPACES IN testcat.ns1", Seq("ns1.ns2"))
sql("DROP NAMESPACE testcat.ns1 CASCADE")
testShowNamespaces("SHOW NAMESPACES IN testcat", Seq())
}
test("DropNamespace: test handling of 'IF EXISTS'") {
sql("DROP NAMESPACE IF EXISTS testcat.unknown")
val exception = intercept[NoSuchNamespaceException] {
sql("DROP NAMESPACE testcat.ns1")
}
assert(exception.getMessage.contains("Namespace 'ns1' not found"))
}
test("DescribeNamespace using v2 catalog") {
withNamespace("testcat.ns1.ns2") {
sql("CREATE NAMESPACE IF NOT EXISTS testcat.ns1.ns2 COMMENT " +
"'test namespace' LOCATION '/tmp/ns_test'")
val descriptionDf = sql("DESCRIBE NAMESPACE testcat.ns1.ns2")
assert(descriptionDf.schema.map(field => (field.name, field.dataType)) ===
Seq(
("name", StringType),
("value", StringType)
))
val description = descriptionDf.collect()
assert(description === Seq(
Row("Namespace Name", "ns2"),
Row(SupportsNamespaces.PROP_COMMENT.capitalize, "test namespace"),
Row(SupportsNamespaces.PROP_LOCATION.capitalize, "/tmp/ns_test"),
Row(SupportsNamespaces.PROP_OWNER.capitalize, defaultUser))
)
}
}
test("AlterNamespaceSetProperties using v2 catalog") {
withNamespace("testcat.ns1.ns2") {
sql("CREATE NAMESPACE IF NOT EXISTS testcat.ns1.ns2 COMMENT " +
"'test namespace' LOCATION '/tmp/ns_test' WITH PROPERTIES ('a'='a','b'='b','c'='c')")
sql("ALTER NAMESPACE testcat.ns1.ns2 SET PROPERTIES ('a'='b','b'='a')")
val descriptionDf = sql("DESCRIBE NAMESPACE EXTENDED testcat.ns1.ns2")
assert(descriptionDf.collect() === Seq(
Row("Namespace Name", "ns2"),
Row(SupportsNamespaces.PROP_COMMENT.capitalize, "test namespace"),
Row(SupportsNamespaces.PROP_LOCATION.capitalize, "/tmp/ns_test"),
Row(SupportsNamespaces.PROP_OWNER.capitalize, defaultUser),
Row("Properties", "((a,b),(b,a),(c,c))"))
)
}
}
test("AlterNamespaceSetProperties: reserved properties") {
import SupportsNamespaces._
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) {
CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key =>
withNamespace("testcat.reservedTest") {
sql("CREATE NAMESPACE testcat.reservedTest")
val exception = intercept[ParseException] {
sql(s"ALTER NAMESPACE testcat.reservedTest SET PROPERTIES ('$key'='dummyVal')")
}
assert(exception.getMessage.contains(s"$key is a reserved namespace property"))
}
}
}
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) {
CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key =>
withNamespace("testcat.reservedTest") {
sql(s"CREATE NAMESPACE testcat.reservedTest")
sql(s"ALTER NAMESPACE testcat.reservedTest SET PROPERTIES ('$key'='foo')")
assert(sql("DESC NAMESPACE EXTENDED testcat.reservedTest")
.toDF("k", "v")
.where("k='Properties'")
.isEmpty, s"$key is a reserved namespace property and ignored")
val meta =
catalog("testcat").asNamespaceCatalog.loadNamespaceMetadata(Array("reservedTest"))
assert(meta.get(key) == null || !meta.get(key).contains("foo"),
"reserved properties should not have side effects")
}
}
}
}
test("AlterNamespaceSetLocation using v2 catalog") {
withNamespace("testcat.ns1.ns2") {
sql("CREATE NAMESPACE IF NOT EXISTS testcat.ns1.ns2 COMMENT " +
"'test namespace' LOCATION '/tmp/ns_test_1'")
sql("ALTER NAMESPACE testcat.ns1.ns2 SET LOCATION '/tmp/ns_test_2'")
val descriptionDf = sql("DESCRIBE NAMESPACE EXTENDED testcat.ns1.ns2")
assert(descriptionDf.collect() === Seq(
Row("Namespace Name", "ns2"),
Row(SupportsNamespaces.PROP_COMMENT.capitalize, "test namespace"),
Row(SupportsNamespaces.PROP_LOCATION.capitalize, "/tmp/ns_test_2"),
Row(SupportsNamespaces.PROP_OWNER.capitalize, defaultUser))
)
}
}
test("ShowNamespaces: show root namespaces with default v2 catalog") {
spark.conf.set(SQLConf.DEFAULT_CATALOG.key, "testcat")
testShowNamespaces("SHOW NAMESPACES", Seq())
spark.sql("CREATE TABLE testcat.ns1.table (id bigint) USING foo")
spark.sql("CREATE TABLE testcat.ns1.ns1_1.table (id bigint) USING foo")
spark.sql("CREATE TABLE testcat.ns2.table (id bigint) USING foo")
testShowNamespaces("SHOW NAMESPACES", Seq("ns1", "ns2"))
testShowNamespaces("SHOW NAMESPACES LIKE '*1*'", Seq("ns1"))
}
test("ShowNamespaces: show namespaces with v2 catalog") {
spark.sql("CREATE TABLE testcat.ns1.table (id bigint) USING foo")
spark.sql("CREATE TABLE testcat.ns1.ns1_1.table (id bigint) USING foo")
spark.sql("CREATE TABLE testcat.ns1.ns1_2.table (id bigint) USING foo")
spark.sql("CREATE TABLE testcat.ns2.table (id bigint) USING foo")
spark.sql("CREATE TABLE testcat.ns2.ns2_1.table (id bigint) USING foo")
// Look up only with catalog name, which should list root namespaces.
testShowNamespaces("SHOW NAMESPACES IN testcat", Seq("ns1", "ns2"))
// Look up sub-namespaces.
testShowNamespaces("SHOW NAMESPACES IN testcat.ns1", Seq("ns1.ns1_1", "ns1.ns1_2"))
testShowNamespaces("SHOW NAMESPACES IN testcat.ns1 LIKE '*2*'", Seq("ns1.ns1_2"))
testShowNamespaces("SHOW NAMESPACES IN testcat.ns2", Seq("ns2.ns2_1"))
// Try to look up namespaces that do not exist.
testShowNamespaces("SHOW NAMESPACES IN testcat.ns3", Seq())
testShowNamespaces("SHOW NAMESPACES IN testcat.ns1.ns3", Seq())
}
test("ShowNamespaces: default v2 catalog is not set") {
spark.sql("CREATE TABLE testcat.ns.table (id bigint) USING foo")
// The current catalog is resolved to a v2 session catalog.
testShowNamespaces("SHOW NAMESPACES", Seq("default"))
}
test("ShowNamespaces: default v2 catalog doesn't support namespace") {
spark.conf.set(
"spark.sql.catalog.testcat_no_namspace",
classOf[BasicInMemoryTableCatalog].getName)
spark.conf.set(SQLConf.DEFAULT_CATALOG.key, "testcat_no_namspace")
val exception = intercept[AnalysisException] {
sql("SHOW NAMESPACES")
}
assert(exception.getMessage.contains("does not support namespaces"))
}
test("ShowNamespaces: v2 catalog doesn't support namespace") {
spark.conf.set(
"spark.sql.catalog.testcat_no_namspace",
classOf[BasicInMemoryTableCatalog].getName)
val exception = intercept[AnalysisException] {
sql("SHOW NAMESPACES in testcat_no_namspace")
}
assert(exception.getMessage.contains("does not support namespaces"))
}
test("ShowNamespaces: session catalog is used and namespace doesn't exist") {
val exception = intercept[AnalysisException] {
sql("SHOW NAMESPACES in dummy")
}
assert(exception.getMessage.contains("Namespace 'dummy' not found"))
}
test("ShowNamespaces: change catalog and namespace with USE statements") {
sql("CREATE TABLE testcat.ns1.ns2.table (id bigint) USING foo")
// Initially, the current catalog is a v2 session catalog.
testShowNamespaces("SHOW NAMESPACES", Seq("default"))
// Update the current catalog to 'testcat'.
sql("USE testcat")
testShowNamespaces("SHOW NAMESPACES", Seq("ns1"))
// Update the current namespace to 'ns1'.
sql("USE ns1")
// 'SHOW NAMESPACES' is not affected by the current namespace and lists root namespaces.
testShowNamespaces("SHOW NAMESPACES", Seq("ns1"))
}
private def testShowNamespaces(
sqlText: String,
expected: Seq[String]): Unit = {
val schema = new StructType().add("namespace", StringType, nullable = false)
val df = spark.sql(sqlText)
assert(df.schema === schema)
assert(df.collect().map(_.getAs[String](0)).sorted === expected.sorted)
}
test("Use: basic tests with USE statements") {
val catalogManager = spark.sessionState.catalogManager
// Validate the initial current catalog and namespace.
assert(catalogManager.currentCatalog.name() == SESSION_CATALOG_NAME)
assert(catalogManager.currentNamespace === Array("default"))
// The following implicitly creates namespaces.
sql("CREATE TABLE testcat.ns1.ns1_1.table (id bigint) USING foo")
sql("CREATE TABLE testcat2.ns2.ns2_2.table (id bigint) USING foo")
sql("CREATE TABLE testcat2.ns3.ns3_3.table (id bigint) USING foo")
sql("CREATE TABLE testcat2.testcat.table (id bigint) USING foo")
// Catalog is resolved to 'testcat'.
sql("USE testcat.ns1.ns1_1")
assert(catalogManager.currentCatalog.name() == "testcat")
assert(catalogManager.currentNamespace === Array("ns1", "ns1_1"))
// Catalog is resolved to 'testcat2'.
sql("USE testcat2.ns2.ns2_2")
assert(catalogManager.currentCatalog.name() == "testcat2")
assert(catalogManager.currentNamespace === Array("ns2", "ns2_2"))
// Only the namespace is changed.
sql("USE ns3.ns3_3")
assert(catalogManager.currentCatalog.name() == "testcat2")
assert(catalogManager.currentNamespace === Array("ns3", "ns3_3"))
// Only the namespace is changed (explicit).
sql("USE NAMESPACE testcat")
assert(catalogManager.currentCatalog.name() == "testcat2")
assert(catalogManager.currentNamespace === Array("testcat"))
// Catalog is resolved to `testcat`.
sql("USE testcat")
assert(catalogManager.currentCatalog.name() == "testcat")
assert(catalogManager.currentNamespace === Array())
}
test("Use: set v2 catalog as a current catalog") {
val catalogManager = spark.sessionState.catalogManager
assert(catalogManager.currentCatalog.name() == SESSION_CATALOG_NAME)
sql("USE testcat")
assert(catalogManager.currentCatalog.name() == "testcat")
}
test("Use: v2 session catalog is used and namespace does not exist") {
val exception = intercept[NoSuchDatabaseException] {
sql("USE ns1")
}
assert(exception.getMessage.contains("Database 'ns1' not found"))
}
test("SPARK-31100: Use: v2 catalog that implements SupportsNamespaces is used " +
"and namespace not exists") {
// Namespaces are required to exist for v2 catalogs that implements SupportsNamespaces.
val exception = intercept[NoSuchNamespaceException] {
sql("USE testcat.ns1.ns2")
}
assert(exception.getMessage.contains("Namespace 'ns1.ns2' not found"))
}
test("SPARK-31100: Use: v2 catalog that does not implement SupportsNameSpaces is used " +
"and namespace does not exist") {
// Namespaces are not required to exist for v2 catalogs
// that does not implement SupportsNamespaces.
withSQLConf("spark.sql.catalog.dummy" -> classOf[BasicInMemoryTableCatalog].getName) {
val catalogManager = spark.sessionState.catalogManager
sql("USE dummy.ns1")
assert(catalogManager.currentCatalog.name() == "dummy")
assert(catalogManager.currentNamespace === Array("ns1"))
}
}
test("ShowCurrentNamespace: basic tests") {
def testShowCurrentNamespace(expectedCatalogName: String, expectedNamespace: String): Unit = {
val schema = new StructType()
.add("catalog", StringType, nullable = false)
.add("namespace", StringType, nullable = false)
val df = sql("SHOW CURRENT NAMESPACE")
val rows = df.collect
assert(df.schema === schema)
assert(rows.length == 1)
assert(rows(0).getAs[String](0) === expectedCatalogName)
assert(rows(0).getAs[String](1) === expectedNamespace)
}
// Initially, the v2 session catalog is set as a current catalog.
testShowCurrentNamespace("spark_catalog", "default")
sql("USE testcat")
testShowCurrentNamespace("testcat", "")
sql("CREATE NAMESPACE testcat.ns1.ns2")
sql("USE testcat.ns1.ns2")
testShowCurrentNamespace("testcat", "ns1.ns2")
}
test("tableCreation: partition column case insensitive resolution") {
val testCatalog = catalog("testcat").asTableCatalog
val sessionCatalog = catalog(SESSION_CATALOG_NAME).asTableCatalog
def checkPartitioning(cat: TableCatalog, partition: String): Unit = {
val namespace = if (cat.name == SESSION_CATALOG_NAME) {
Array("default")
} else {
Array[String]()
}
val table = cat.loadTable(Identifier.of(namespace, "tbl"))
val partitions = table.partitioning().map(_.references())
assert(partitions.length === 1)
val fieldNames = partitions.flatMap(_.map(_.fieldNames()))
assert(fieldNames === Array(Array(partition)))
}
sql(s"CREATE TABLE tbl (a int, b string) USING $v2Source PARTITIONED BY (A)")
checkPartitioning(sessionCatalog, "a")
sql(s"CREATE TABLE testcat.tbl (a int, b string) USING $v2Source PARTITIONED BY (A)")
checkPartitioning(testCatalog, "a")
sql(s"CREATE OR REPLACE TABLE tbl (a int, b string) USING $v2Source PARTITIONED BY (B)")
checkPartitioning(sessionCatalog, "b")
sql(s"CREATE OR REPLACE TABLE testcat.tbl (a int, b string) USING $v2Source PARTITIONED BY (B)")
checkPartitioning(testCatalog, "b")
}
test("tableCreation: partition column case sensitive resolution") {
def checkFailure(statement: String): Unit = {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
val e = intercept[AnalysisException] {
sql(statement)
}
assert(e.getMessage.contains("Couldn't find column"))
}
}
checkFailure(s"CREATE TABLE tbl (a int, b string) USING $v2Source PARTITIONED BY (A)")
checkFailure(s"CREATE TABLE testcat.tbl (a int, b string) USING $v2Source PARTITIONED BY (A)")
checkFailure(
s"CREATE OR REPLACE TABLE tbl (a int, b string) USING $v2Source PARTITIONED BY (B)")
checkFailure(
s"CREATE OR REPLACE TABLE testcat.tbl (a int, b string) USING $v2Source PARTITIONED BY (B)")
}
test("tableCreation: duplicate column names in the table definition") {
val errorMsg = "Found duplicate column(s) in the table definition of"
Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
assertAnalysisError(
s"CREATE TABLE t ($c0 INT, $c1 INT) USING $v2Source",
s"$errorMsg default.t"
)
assertAnalysisError(
s"CREATE TABLE testcat.t ($c0 INT, $c1 INT) USING $v2Source",
s"$errorMsg t"
)
assertAnalysisError(
s"CREATE OR REPLACE TABLE t ($c0 INT, $c1 INT) USING $v2Source",
s"$errorMsg default.t"
)
assertAnalysisError(
s"CREATE OR REPLACE TABLE testcat.t ($c0 INT, $c1 INT) USING $v2Source",
s"$errorMsg t"
)
}
}
}
test("tableCreation: duplicate nested column names in the table definition") {
val errorMsg = "Found duplicate column(s) in the table definition of"
Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
assertAnalysisError(
s"CREATE TABLE t (d struct<$c0: INT, $c1: INT>) USING $v2Source",
s"$errorMsg default.t"
)
assertAnalysisError(
s"CREATE TABLE testcat.t (d struct<$c0: INT, $c1: INT>) USING $v2Source",
s"$errorMsg t"
)
assertAnalysisError(
s"CREATE OR REPLACE TABLE t (d struct<$c0: INT, $c1: INT>) USING $v2Source",
s"$errorMsg default.t"
)
assertAnalysisError(
s"CREATE OR REPLACE TABLE testcat.t (d struct<$c0: INT, $c1: INT>) USING $v2Source",
s"$errorMsg t"
)
}
}
}
test("tableCreation: bucket column names not in table definition") {
val errorMsg = "Couldn't find column c in"
assertAnalysisError(
s"CREATE TABLE tbl (a int, b string) USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS",
errorMsg
)
assertAnalysisError(
s"CREATE TABLE testcat.tbl (a int, b string) USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS",
errorMsg
)
assertAnalysisError(
s"CREATE OR REPLACE TABLE tbl (a int, b string) USING $v2Source " +
"CLUSTERED BY (c) INTO 4 BUCKETS",
errorMsg
)
assertAnalysisError(
s"CREATE OR REPLACE TABLE testcat.tbl (a int, b string) USING $v2Source " +
"CLUSTERED BY (c) INTO 4 BUCKETS",
errorMsg
)
}
test("tableCreation: bucket column name containing dot") {
withTable("t") {
sql(
"""
|CREATE TABLE testcat.t (id int, `a.b` string) USING foo
|CLUSTERED BY (`a.b`) INTO 4 BUCKETS
""".stripMargin)
val testCatalog = catalog("testcat").asTableCatalog.asInstanceOf[InMemoryTableCatalog]
val table = testCatalog.loadTable(Identifier.of(Array.empty, "t"))
val partitioning = table.partitioning()
assert(partitioning.length == 1 && partitioning.head.name() == "bucket")
val references = partitioning.head.references()
assert(references.length == 1)
assert(references.head.fieldNames().toSeq == Seq("a.b"))
}
}
test("tableCreation: column repeated in partition columns") {
val errorMsg = "Found duplicate column(s) in the partitioning"
Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
assertAnalysisError(
s"CREATE TABLE t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)",
errorMsg
)
assertAnalysisError(
s"CREATE TABLE testcat.t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)",
errorMsg
)
assertAnalysisError(
s"CREATE OR REPLACE TABLE t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)",
errorMsg
)
assertAnalysisError(
s"CREATE OR REPLACE TABLE testcat.t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)",
errorMsg
)
}
}
}
test("tableCreation: column repeated in bucket columns") {
val errorMsg = "Found duplicate column(s) in the bucket definition"
Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
assertAnalysisError(
s"CREATE TABLE t ($c0 INT) USING $v2Source " +
s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS",
errorMsg
)
assertAnalysisError(
s"CREATE TABLE testcat.t ($c0 INT) USING $v2Source " +
s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS",
errorMsg
)
assertAnalysisError(
s"CREATE OR REPLACE TABLE t ($c0 INT) USING $v2Source " +
s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS",
errorMsg
)
assertAnalysisError(
s"CREATE OR REPLACE TABLE testcat.t ($c0 INT) USING $v2Source " +
s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS",
errorMsg
)
}
}
}
test("REFRESH TABLE: v2 table") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
val testCatalog = catalog("testcat").asTableCatalog.asInstanceOf[InMemoryTableCatalog]
val identifier = Identifier.of(Array("ns1", "ns2"), "tbl")
assert(!testCatalog.isTableInvalidated(identifier))
sql(s"REFRESH TABLE $t")
assert(testCatalog.isTableInvalidated(identifier))
}
}
test("SPARK-32990: REFRESH TABLE should resolve to a temporary view first") {
withTable("testcat.ns.t") {
withTempView("t") {
sql("CREATE TABLE testcat.ns.t (id bigint) USING foo")
sql("CREATE TEMPORARY VIEW t AS SELECT 2")
sql("USE testcat.ns")
val testCatalog = catalog("testcat").asTableCatalog.asInstanceOf[InMemoryTableCatalog]
val identifier = Identifier.of(Array("ns"), "t")
assert(!testCatalog.isTableInvalidated(identifier))
sql("REFRESH TABLE t")
assert(!testCatalog.isTableInvalidated(identifier))
}
}
}
test("REPLACE TABLE: v1 table") {
val e = intercept[AnalysisException] {
sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}")
}
assert(e.message.contains("REPLACE TABLE is only supported with v2 tables"))
}
test("DeleteFrom: basic - delete all") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
sql(s"DELETE FROM $t")
checkAnswer(spark.table(t), Seq())
}
}
test("DeleteFrom: basic - delete with where clause") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
sql(s"DELETE FROM $t WHERE id = 2")
checkAnswer(spark.table(t), Seq(
Row(3, "c", 3)))
}
}
test("DeleteFrom: delete from aliased target table") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
sql(s"DELETE FROM $t AS tbl WHERE tbl.id = 2")
checkAnswer(spark.table(t), Seq(
Row(3, "c", 3)))
}
}
test("DeleteFrom: normalize attribute names") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
sql(s"DELETE FROM $t AS tbl WHERE tbl.ID = 2")
checkAnswer(spark.table(t), Seq(
Row(3, "c", 3)))
}
}
test("DeleteFrom: fail if has subquery") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
val exc = intercept[AnalysisException] {
sql(s"DELETE FROM $t WHERE id IN (SELECT id FROM $t)")
}
assert(spark.table(t).count === 3)
assert(exc.getMessage.contains("Delete by condition with subquery is not supported"))
}
}
test("DeleteFrom: DELETE is only supported with v2 tables") {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
val v1Table = "tbl"
withTable(v1Table) {
sql(s"CREATE TABLE $v1Table" +
s" USING ${classOf[SimpleScanSource].getName} OPTIONS (from=0,to=1)")
val exc = intercept[AnalysisException] {
sql(s"DELETE FROM $v1Table WHERE i = 2")
}
assert(exc.getMessage.contains("DELETE is only supported with v2 tables"))
}
}
test("UPDATE TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(
s"""
|CREATE TABLE $t (id bigint, name string, age int, p int)
|USING foo
|PARTITIONED BY (id, p)
""".stripMargin)
// UPDATE non-existing table
assertAnalysisError(
"UPDATE dummy SET name='abc'",
"Table or view not found")
// UPDATE non-existing column
assertAnalysisError(
s"UPDATE $t SET dummy='abc'",
"cannot resolve")
assertAnalysisError(
s"UPDATE $t SET name='abc' WHERE dummy=1",
"cannot resolve")
// UPDATE is not implemented yet.
val e = intercept[UnsupportedOperationException] {
sql(s"UPDATE $t SET name='Robert', age=32 WHERE p=1")
}
assert(e.getMessage.contains("UPDATE TABLE is not supported temporarily"))
}
}
test("MERGE INTO TABLE") {
val target = "testcat.ns1.ns2.target"
val source = "testcat.ns1.ns2.source"
withTable(target, source) {
sql(
s"""
|CREATE TABLE $target (id bigint, name string, age int, p int)
|USING foo
|PARTITIONED BY (id, p)
""".stripMargin)
sql(
s"""
|CREATE TABLE $source (id bigint, name string, age int, p int)
|USING foo
|PARTITIONED BY (id, p)
""".stripMargin)
// MERGE INTO non-existing table
assertAnalysisError(
s"""
|MERGE INTO testcat.ns1.ns2.dummy AS target
|USING testcat.ns1.ns2.source AS source
|ON target.id = source.id
|WHEN MATCHED AND (target.age < 10) THEN DELETE
|WHEN MATCHED AND (target.age > 10) THEN UPDATE SET *
|WHEN NOT MATCHED AND (target.col2='insert')
|THEN INSERT *
""".stripMargin,
"Table or view not found")
// USING non-existing table
assertAnalysisError(
s"""
|MERGE INTO testcat.ns1.ns2.target AS target
|USING testcat.ns1.ns2.dummy AS source
|ON target.id = source.id
|WHEN MATCHED AND (target.age < 10) THEN DELETE
|WHEN MATCHED AND (target.age > 10) THEN UPDATE SET *
|WHEN NOT MATCHED AND (target.col2='insert')
|THEN INSERT *
""".stripMargin,
"Table or view not found")
// UPDATE non-existing column
assertAnalysisError(
s"""
|MERGE INTO testcat.ns1.ns2.target AS target
|USING testcat.ns1.ns2.source AS source
|ON target.id = source.id
|WHEN MATCHED AND (target.age < 10) THEN DELETE
|WHEN MATCHED AND (target.age > 10) THEN UPDATE SET target.dummy = source.age
|WHEN NOT MATCHED AND (target.col2='insert')
|THEN INSERT *
""".stripMargin,
"cannot resolve")
// UPDATE using non-existing column
assertAnalysisError(
s"""
|MERGE INTO testcat.ns1.ns2.target AS target
|USING testcat.ns1.ns2.source AS source
|ON target.id = source.id
|WHEN MATCHED AND (target.age < 10) THEN DELETE
|WHEN MATCHED AND (target.age > 10) THEN UPDATE SET target.age = source.dummy
|WHEN NOT MATCHED AND (target.col2='insert')
|THEN INSERT *
""".stripMargin,
"cannot resolve")
// MERGE INTO is not implemented yet.
val e = intercept[UnsupportedOperationException] {
sql(
s"""
|MERGE INTO testcat.ns1.ns2.target AS target
|USING testcat.ns1.ns2.source AS source
|ON target.id = source.id
|WHEN MATCHED AND (target.p < 0) THEN DELETE
|WHEN MATCHED AND (target.p > 0) THEN UPDATE SET *
|WHEN NOT MATCHED THEN INSERT *
""".stripMargin)
}
assert(e.getMessage.contains("MERGE INTO TABLE is not supported temporarily"))
}
}
test("AlterTable: rename table basic test") {
withTable("testcat.ns1.new") {
sql(s"CREATE TABLE testcat.ns1.ns2.old USING foo AS SELECT id, data FROM source")
checkAnswer(sql("SHOW TABLES FROM testcat.ns1.ns2"), Seq(Row("ns1.ns2", "old")))
sql(s"ALTER TABLE testcat.ns1.ns2.old RENAME TO ns1.new")
checkAnswer(sql("SHOW TABLES FROM testcat.ns1.ns2"), Seq.empty)
checkAnswer(sql("SHOW TABLES FROM testcat.ns1"), Seq(Row("ns1", "new")))
}
}
test("AlterTable: renaming views are not supported") {
val e = intercept[AnalysisException] {
sql(s"ALTER VIEW testcat.ns.tbl RENAME TO ns.view")
}
assert(e.getMessage.contains("Renaming view is not supported in v2 catalogs"))
}
test("ANALYZE TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
testV1Command("ANALYZE TABLE", s"$t COMPUTE STATISTICS")
testV1CommandSupportingTempView("ANALYZE TABLE", s"$t COMPUTE STATISTICS FOR ALL COLUMNS")
}
}
test("MSCK REPAIR TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
testV1Command("MSCK REPAIR TABLE", t)
}
}
test("TRUNCATE TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(
s"""
|CREATE TABLE $t (id bigint, data string)
|USING foo
|PARTITIONED BY (id)
""".stripMargin)
testV1Command("TRUNCATE TABLE", t)
testV1Command("TRUNCATE TABLE", s"$t PARTITION(id='1')")
}
}
test("SHOW PARTITIONS") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(
s"""
|CREATE TABLE $t (id bigint, data string)
|USING foo
|PARTITIONED BY (id)
""".stripMargin)
testV1Command("SHOW PARTITIONS", t)
testV1Command("SHOW PARTITIONS", s"$t PARTITION(id='1')")
}
}
test("LOAD DATA INTO TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(
s"""
|CREATE TABLE $t (id bigint, data string)
|USING foo
|PARTITIONED BY (id)
""".stripMargin)
testV1Command("LOAD DATA", s"INPATH 'filepath' INTO TABLE $t")
testV1Command("LOAD DATA", s"LOCAL INPATH 'filepath' INTO TABLE $t")
testV1Command("LOAD DATA", s"LOCAL INPATH 'filepath' OVERWRITE INTO TABLE $t")
testV1Command("LOAD DATA",
s"LOCAL INPATH 'filepath' OVERWRITE INTO TABLE $t PARTITION(id=1)")
}
}
test("SHOW CREATE TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
testV1CommandSupportingTempView("SHOW CREATE TABLE", t)
}
}
test("CACHE TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
testV1CommandSupportingTempView("CACHE TABLE", t)
val e = intercept[AnalysisException] {
sql(s"CACHE LAZY TABLE $t")
}
assert(e.message.contains("CACHE TABLE is only supported with temp views or v1 tables"))
}
}
test("UNCACHE TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
testV1CommandSupportingTempView("UNCACHE TABLE", t)
testV1CommandSupportingTempView("UNCACHE TABLE", s"IF EXISTS $t")
}
}
test("SHOW COLUMNS") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
testV1CommandSupportingTempView("SHOW COLUMNS", s"FROM $t")
testV1CommandSupportingTempView("SHOW COLUMNS", s"IN $t")
val e3 = intercept[AnalysisException] {
sql(s"SHOW COLUMNS FROM tbl IN testcat.ns1.ns2")
}
assert(e3.message.contains("Namespace name should have " +
"only one part if specified: testcat.ns1.ns2"))
}
}
test("ALTER TABLE RECOVER PARTITIONS") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $t RECOVER PARTITIONS")
}
assert(e.message.contains("ALTER TABLE RECOVER PARTITIONS is only supported with v1 tables"))
}
}
test("ALTER TABLE ADD PARTITION") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'")
}
assert(e.message.contains("ALTER TABLE ADD PARTITION is only supported with v1 tables"))
}
}
test("ALTER TABLE RENAME PARTITION") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $t PARTITION (id=1) RENAME TO PARTITION (id=2)")
}
assert(e.message.contains("ALTER TABLE RENAME PARTITION is only supported with v1 tables"))
}
}
test("ALTER TABLE DROP PARTITIONS") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $t DROP PARTITION (id=1)")
}
assert(e.message.contains("ALTER TABLE DROP PARTITION is only supported with v1 tables"))
}
}
test("ALTER TABLE SerDe properties") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $t SET SERDEPROPERTIES ('columns'='foo,bar', 'field.delim' = ',')")
}
assert(e.message.contains("ALTER TABLE SerDe Properties is only supported with v1 tables"))
}
}
test("ALTER VIEW AS QUERY") {
val v = "testcat.ns1.ns2.v"
val e = intercept[AnalysisException] {
sql(s"ALTER VIEW $v AS SELECT 1")
}
assert(e.message.contains("ALTER VIEW QUERY is only supported with temp views or v1 tables"))
}
test("CREATE VIEW") {
val v = "testcat.ns1.ns2.v"
val e = intercept[AnalysisException] {
sql(s"CREATE VIEW $v AS SELECT * FROM tab1")
}
assert(e.message.contains("CREATE VIEW is only supported with v1 tables"))
}
test("SHOW TBLPROPERTIES: v2 table") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
val user = "andrew"
val status = "new"
val provider = "foo"
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING $provider " +
s"TBLPROPERTIES ('user'='$user', 'status'='$status')")
val properties = sql(s"SHOW TBLPROPERTIES $t").orderBy("key")
val schema = new StructType()
.add("key", StringType, nullable = false)
.add("value", StringType, nullable = false)
val expected = Seq(
Row("status", status),
Row("user", user))
assert(properties.schema === schema)
assert(expected === properties.collect())
}
}
test("SHOW TBLPROPERTIES(key): v2 table") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
val user = "andrew"
val status = "new"
val provider = "foo"
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING $provider " +
s"TBLPROPERTIES ('user'='$user', 'status'='$status')")
val properties = sql(s"SHOW TBLPROPERTIES $t ('status')")
val expected = Seq(Row("status", status))
assert(expected === properties.collect())
}
}
test("SHOW TBLPROPERTIES(key): v2 table, key not found") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
val nonExistingKey = "nonExistingKey"
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo " +
s"TBLPROPERTIES ('user'='andrew', 'status'='new')")
val properties = sql(s"SHOW TBLPROPERTIES $t ('$nonExistingKey')")
val expected = Seq(Row(nonExistingKey, s"Table $t does not have property: $nonExistingKey"))
assert(expected === properties.collect())
}
}
test("DESCRIBE FUNCTION: only support session catalog") {
val e = intercept[AnalysisException] {
sql("DESCRIBE FUNCTION testcat.ns1.ns2.fun")
}
assert(e.message.contains("function is only supported in v1 catalog"))
val e1 = intercept[AnalysisException] {
sql("DESCRIBE FUNCTION default.ns1.ns2.fun")
}
assert(e1.message.contains(
"The namespace in session catalog must have exactly one name part: default.ns1.ns2.fun"))
}
test("SHOW FUNCTIONS not valid v1 namespace") {
val function = "testcat.ns1.ns2.fun"
val e = intercept[AnalysisException] {
sql(s"SHOW FUNCTIONS LIKE $function")
}
assert(e.message.contains("function is only supported in v1 catalog"))
}
test("DROP FUNCTION: only support session catalog") {
val e = intercept[AnalysisException] {
sql("DROP FUNCTION testcat.ns1.ns2.fun")
}
assert(e.message.contains("function is only supported in v1 catalog"))
val e1 = intercept[AnalysisException] {
sql("DROP FUNCTION default.ns1.ns2.fun")
}
assert(e1.message.contains(
"The namespace in session catalog must have exactly one name part: default.ns1.ns2.fun"))
}
test("CREATE FUNCTION: only support session catalog") {
val e = intercept[AnalysisException] {
sql("CREATE FUNCTION testcat.ns1.ns2.fun as 'f'")
}
assert(e.message.contains("function is only supported in v1 catalog"))
val e1 = intercept[AnalysisException] {
sql("CREATE FUNCTION default.ns1.ns2.fun as 'f'")
}
assert(e1.message.contains(
"The namespace in session catalog must have exactly one name part: default.ns1.ns2.fun"))
}
test("REFRESH FUNCTION: only support session catalog") {
val e = intercept[AnalysisException] {
sql("REFRESH FUNCTION testcat.ns1.ns2.fun")
}
assert(e.message.contains("function is only supported in v1 catalog"))
val e1 = intercept[AnalysisException] {
sql("REFRESH FUNCTION default.ns1.ns2.fun")
}
assert(e1.message.contains(
"The namespace in session catalog must have exactly one name part: default.ns1.ns2.fun"))
}
test("global temp view should not be masked by v2 catalog") {
val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName)
try {
sql("create global temp view v as select 1")
sql(s"alter view $globalTempDB.v rename to v2")
checkAnswer(spark.table(s"$globalTempDB.v2"), Row(1))
sql(s"drop view $globalTempDB.v2")
} finally {
spark.sharedState.globalTempViewManager.clear()
}
}
test("SPARK-30104: global temp db is used as a table name under v2 catalog") {
val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
val t = s"testcat.$globalTempDB"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
sql("USE testcat")
// The following should not throw AnalysisException, but should use `testcat.$globalTempDB`.
sql(s"DESCRIBE TABLE $globalTempDB")
}
}
test("SPARK-30104: v2 catalog named global_temp will be masked") {
val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName)
val e = intercept[AnalysisException] {
// Since the following multi-part name starts with `globalTempDB`, it is resolved to
// the session catalog, not the `gloabl_temp` v2 catalog.
sql(s"CREATE TABLE $globalTempDB.ns1.ns2.tbl (id bigint, data string) USING json")
}
assert(e.message.contains(
"The namespace in session catalog must have exactly one name part: global_temp.ns1.ns2.tbl"))
}
test("table name same as catalog can be used") {
withTable("testcat.testcat") {
sql(s"CREATE TABLE testcat.testcat (id bigint, data string) USING foo")
sql("USE testcat")
// The following should not throw AnalysisException.
sql(s"DESCRIBE TABLE testcat")
}
}
test("SPARK-30001: session catalog name can be specified in SQL statements") {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
withTable("t") {
sql("CREATE TABLE t USING json AS SELECT 1 AS i")
checkAnswer(sql("select * from t"), Row(1))
checkAnswer(sql("select * from spark_catalog.default.t"), Row(1))
}
}
test("SPARK-30885: v1 table name should be fully qualified") {
def assertWrongTableIdent(): Unit = {
withTable("t") {
sql("CREATE TABLE t USING json AS SELECT 1 AS i")
val t = "spark_catalog.t"
def verify(sql: String): Unit = {
val e = intercept[AnalysisException](spark.sql(sql))
assert(e.message.contains(
s"The namespace in session catalog must have exactly one name part: $t"))
}
verify(s"select * from $t")
// Verify V1 commands that bypass table lookups.
verify(s"REFRESH TABLE $t")
verify(s"DESCRIBE $t i")
verify(s"DROP TABLE $t")
verify(s"DROP VIEW $t")
verify(s"ANALYZE TABLE $t COMPUTE STATISTICS")
verify(s"ANALYZE TABLE $t COMPUTE STATISTICS FOR ALL COLUMNS")
verify(s"MSCK REPAIR TABLE $t")
verify(s"LOAD DATA INPATH 'filepath' INTO TABLE $t")
verify(s"SHOW CREATE TABLE $t")
verify(s"SHOW CREATE TABLE $t AS SERDE")
verify(s"CACHE TABLE $t")
verify(s"UNCACHE TABLE $t")
verify(s"TRUNCATE TABLE $t")
verify(s"SHOW PARTITIONS $t")
verify(s"SHOW COLUMNS FROM $t")
}
}
assertWrongTableIdent()
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
assertWrongTableIdent()
}
test("SPARK-30259: session catalog can be specified in CREATE TABLE AS SELECT command") {
withTable("tbl") {
val ident = Identifier.of(Array("default"), "tbl")
sql("CREATE TABLE spark_catalog.default.tbl USING json AS SELECT 1 AS i")
assert(catalog("spark_catalog").asTableCatalog.tableExists(ident) === true)
}
}
test("SPARK-30259: session catalog can be specified in CREATE TABLE command") {
withTable("tbl") {
val ident = Identifier.of(Array("default"), "tbl")
sql("CREATE TABLE spark_catalog.default.tbl (col string) USING json")
assert(catalog("spark_catalog").asTableCatalog.tableExists(ident) === true)
}
}
test("SPARK-30094: current namespace is used during table resolution") {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
withTable("spark_catalog.default.t", "testcat.ns.t") {
sql("CREATE TABLE t USING parquet AS SELECT 1")
sql("CREATE TABLE testcat.ns.t USING parquet AS SELECT 2")
checkAnswer(sql("SELECT * FROM t"), Row(1))
sql("USE testcat.ns")
checkAnswer(sql("SELECT * FROM t"), Row(2))
}
}
test("SPARK-30284: CREATE VIEW should track the current catalog and namespace") {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
val sessionCatalogName = CatalogManager.SESSION_CATALOG_NAME
sql("CREATE NAMESPACE testcat.ns1.ns2")
sql("USE testcat.ns1.ns2")
sql("CREATE TABLE t USING foo AS SELECT 1 col")
checkAnswer(spark.table("t"), Row(1))
withTempView("t") {
spark.range(10).createTempView("t")
withView(s"$sessionCatalogName.default.v") {
val e = intercept[AnalysisException] {
sql(s"CREATE VIEW $sessionCatalogName.default.v AS SELECT * FROM t")
}
assert(e.message.contains("referencing a temporary view"))
}
}
withTempView("t") {
withView(s"$sessionCatalogName.default.v") {
sql(s"CREATE VIEW $sessionCatalogName.default.v " +
"AS SELECT t1.col FROM t t1 JOIN ns1.ns2.t t2")
sql(s"USE $sessionCatalogName")
// The view should read data from table `testcat.ns1.ns2.t` not the temp view.
spark.range(10).createTempView("t")
checkAnswer(spark.table("v"), Row(1))
}
}
}
test("COMMENT ON NAMESPACE") {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
// Session catalog is used.
sql("CREATE NAMESPACE ns")
checkNamespaceComment("ns", "minor revision")
checkNamespaceComment("ns", null)
checkNamespaceComment("ns", "NULL")
intercept[AnalysisException](sql("COMMENT ON NAMESPACE abc IS NULL"))
// V2 non-session catalog is used.
sql("CREATE NAMESPACE testcat.ns1")
checkNamespaceComment("testcat.ns1", "minor revision")
checkNamespaceComment("testcat.ns1", null)
checkNamespaceComment("testcat.ns1", "NULL")
intercept[AnalysisException](sql("COMMENT ON NAMESPACE testcat.abc IS NULL"))
}
private def checkNamespaceComment(namespace: String, comment: String): Unit = {
sql(s"COMMENT ON NAMESPACE $namespace IS " +
Option(comment).map("'" + _ + "'").getOrElse("NULL"))
val expectedComment = Option(comment).getOrElse("")
assert(sql(s"DESC NAMESPACE extended $namespace").toDF("k", "v")
.where(s"k='${SupportsNamespaces.PROP_COMMENT.capitalize}'")
.head().getString(1) === expectedComment)
}
test("COMMENT ON TABLE") {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
// Session catalog is used.
withTable("t") {
sql("CREATE TABLE t(k int) USING json")
checkTableComment("t", "minor revision")
checkTableComment("t", null)
checkTableComment("t", "NULL")
}
intercept[AnalysisException](sql("COMMENT ON TABLE abc IS NULL"))
// V2 non-session catalog is used.
withTable("testcat.ns1.ns2.t") {
sql("CREATE TABLE testcat.ns1.ns2.t(k int) USING foo")
checkTableComment("testcat.ns1.ns2.t", "minor revision")
checkTableComment("testcat.ns1.ns2.t", null)
checkTableComment("testcat.ns1.ns2.t", "NULL")
}
intercept[AnalysisException](sql("COMMENT ON TABLE testcat.abc IS NULL"))
val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName)
withTempView("v") {
sql("create global temp view v as select 1")
val e = intercept[AnalysisException](sql("COMMENT ON TABLE global_temp.v IS NULL"))
assert(e.getMessage.contains("global_temp.v is a temp view not table."))
}
}
private def checkTableComment(tableName: String, comment: String): Unit = {
sql(s"COMMENT ON TABLE $tableName IS " + Option(comment).map("'" + _ + "'").getOrElse("NULL"))
val expectedComment = Option(comment).getOrElse("")
assert(sql(s"DESC extended $tableName").toDF("k", "v", "c")
.where(s"k='${TableCatalog.PROP_COMMENT.capitalize}'")
.head().getString(1) === expectedComment)
}
test("SPARK-30799: temp view name can't contain catalog name") {
val sessionCatalogName = CatalogManager.SESSION_CATALOG_NAME
withTempView("v") {
spark.range(10).createTempView("v")
val e1 = intercept[AnalysisException](
sql(s"CACHE TABLE $sessionCatalogName.v")
)
assert(e1.message.contains(
"The namespace in session catalog must have exactly one name part: spark_catalog.v"))
}
val e2 = intercept[AnalysisException] {
sql(s"CREATE TEMP VIEW $sessionCatalogName.v AS SELECT 1")
}
assert(e2.message.contains("It is not allowed to add database prefix"))
}
test("SPARK-31015: star expression should work for qualified column names for v2 tables") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, name string) USING foo")
sql(s"INSERT INTO $t VALUES (1, 'hello')")
def check(tbl: String): Unit = {
checkAnswer(sql(s"SELECT testcat.ns1.ns2.tbl.* FROM $tbl"), Row(1, "hello"))
checkAnswer(sql(s"SELECT ns1.ns2.tbl.* FROM $tbl"), Row(1, "hello"))
checkAnswer(sql(s"SELECT ns2.tbl.* FROM $tbl"), Row(1, "hello"))
checkAnswer(sql(s"SELECT tbl.* FROM $tbl"), Row(1, "hello"))
}
// Test with qualified table name "testcat.ns1.ns2.tbl".
check(t)
// Test if current catalog and namespace is respected in column resolution.
sql("USE testcat.ns1.ns2")
check("tbl")
val ex = intercept[AnalysisException] {
sql(s"SELECT ns1.ns2.ns3.tbl.* from $t")
}
assert(ex.getMessage.contains("cannot resolve 'ns1.ns2.ns3.tbl.*"))
}
}
test("SPARK-32168: INSERT OVERWRITE - hidden days partition - dynamic mode") {
def testTimestamp(daysOffset: Int): Timestamp = {
Timestamp.valueOf(LocalDate.of(2020, 1, 1 + daysOffset).atStartOfDay())
}
withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
val t1 = s"${catalogAndNamespace}tbl"
withTable(t1) {
val df = spark.createDataFrame(Seq(
(testTimestamp(1), "a"),
(testTimestamp(2), "b"),
(testTimestamp(3), "c"))).toDF("ts", "data")
df.createOrReplaceTempView("source_view")
sql(s"CREATE TABLE $t1 (ts timestamp, data string) " +
s"USING $v2Format PARTITIONED BY (days(ts))")
sql(s"INSERT INTO $t1 VALUES " +
s"(CAST(date_add('2020-01-01', 2) AS timestamp), 'dummy'), " +
s"(CAST(date_add('2020-01-01', 4) AS timestamp), 'keep')")
sql(s"INSERT OVERWRITE TABLE $t1 SELECT ts, data FROM source_view")
val expected = spark.createDataFrame(Seq(
(testTimestamp(1), "a"),
(testTimestamp(2), "b"),
(testTimestamp(3), "c"),
(testTimestamp(4), "keep"))).toDF("ts", "data")
verifyTable(t1, expected)
}
}
}
private def testV1Command(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")
}
assert(e.message.contains(s"$sqlCommand is only supported with v1 tables"))
}
private def testV1CommandSupportingTempView(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")
}
assert(e.message.contains(s"$sqlCommand is only supported with temp views or v1 tables"))
}
private def assertAnalysisError(sqlStatement: String, expectedError: String): Unit = {
val errMsg = intercept[AnalysisException] {
sql(sqlStatement)
}.getMessage
assert(errMsg.contains(expectedError))
}
}
/** Used as a V2 DataSource for V2SessionCatalog DDL */
class FakeV2Provider extends SimpleTableProvider {
override def getTable(options: CaseInsensitiveStringMap): Table = {
throw new UnsupportedOperationException("Unnecessary for DDL tests")
}
}