blob: 6fa7ad56b68ea87f5a5d554fc538a6a3977e3fc8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.internal
import java.io.File
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.catalog.{Column, Database, Function, Table}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, ScalaReflection, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.AnalysisTest
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.Range
import org.apache.spark.sql.connector.FakeV2Provider
import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, InMemoryCatalog}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
import org.apache.spark.sql.connector.catalog.functions._
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
/**
* Tests for the user-facing [[org.apache.spark.sql.catalog.Catalog]].
*/
class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAfter {
import testImplicits._
private def sessionCatalog: SessionCatalog = spark.sessionState.catalog
private val utils = new CatalogTestUtils {
override val tableInputFormat: String = "com.fruit.eyephone.CameraInputFormat"
override val tableOutputFormat: String = "com.fruit.eyephone.CameraOutputFormat"
override val defaultProvider: String = "parquet"
override def newEmptyCatalog(): ExternalCatalog = spark.sharedState.externalCatalog
}
private def createDatabase(name: String): Unit = {
sessionCatalog.createDatabase(utils.newDb(name), ignoreIfExists = false)
}
private def dropDatabase(name: String): Unit = {
sessionCatalog.dropDatabase(name, ignoreIfNotExists = false, cascade = true)
}
private def createTable(name: String, db: Option[String] = None): Unit = {
sessionCatalog.createTable(utils.newTable(name, db), ignoreIfExists = false)
}
private def createTable(name: String, db: String, catalog: String, source: String,
schema: StructType, option: Map[String, String], description: String): DataFrame = {
spark.catalog.createTable(Array(catalog, db, name).mkString("."), source,
schema, description, option)
}
private def createTempTable(name: String): Unit = {
createTempView(sessionCatalog, name, Range(1, 2, 3, 4), overrideIfExists = true)
}
private def dropTable(name: String, db: Option[String] = None): Unit = {
sessionCatalog.dropTable(TableIdentifier(name, db), ignoreIfNotExists = false, purge = false)
}
private def createFunction(name: String, db: Option[String] = None): Unit = {
sessionCatalog.createFunction(utils.newFunc(name, db), ignoreIfExists = false)
}
private def createTempFunction(name: String): Unit = {
val tempFunc = (e: Seq[Expression]) => e.head
val funcMeta = CatalogFunction(FunctionIdentifier(name, None), "className", Nil)
sessionCatalog.registerFunction(
funcMeta, overrideIfExists = false, functionBuilder = Some(tempFunc))
}
private def dropFunction(name: String, db: Option[String] = None): Unit = {
sessionCatalog.dropFunction(FunctionIdentifier(name, db), ignoreIfNotExists = false)
}
private def dropTempFunction(name: String): Unit = {
sessionCatalog.dropTempFunction(name, ignoreIfNotExists = false)
}
private def testListColumns(tableName: String, dbName: Option[String]): Unit = {
val tableMetadata = sessionCatalog.getTableMetadata(TableIdentifier(tableName, dbName))
val columns = dbName
.map { db => spark.catalog.listColumns(db, tableName) }
.getOrElse { spark.catalog.listColumns(tableName) }
assert(tableMetadata.schema.nonEmpty, "bad test")
assert(tableMetadata.partitionColumnNames.nonEmpty, "bad test")
assert(tableMetadata.bucketSpec.isDefined, "bad test")
assert(columns.collect().map(_.name).toSet == tableMetadata.schema.map(_.name).toSet)
val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
columns.collect().foreach { col =>
assert(col.isPartition == tableMetadata.partitionColumnNames.contains(col.name))
assert(col.isBucket == bucketColumnNames.contains(col.name))
}
dbName.foreach { db =>
val expected = columns.collect().map(_.name).toSet
assert(spark.catalog.listColumns(s"$db.$tableName").collect().map(_.name).toSet == expected)
}
}
override def afterEach(): Unit = {
try {
sessionCatalog.reset()
spark.sessionState.catalogManager.reset()
} finally {
super.afterEach()
}
}
before {
spark.conf.set("spark.sql.catalog.testcat", classOf[InMemoryCatalog].getName)
}
after {
spark.sessionState.catalogManager.reset()
spark.sessionState.conf.clear()
}
test("current database") {
assert(spark.catalog.currentDatabase == "default")
assert(sessionCatalog.getCurrentDatabase == "default")
createDatabase("my_db")
spark.catalog.setCurrentDatabase("my_db")
assert(spark.catalog.currentDatabase == "my_db")
assert(sessionCatalog.getCurrentDatabase == "my_db")
val e = intercept[AnalysisException] {
spark.catalog.setCurrentDatabase("unknown_db")
}
assert(e.getMessage.contains("unknown_db"))
}
test("list databases") {
assert(spark.catalog.listDatabases().collect().map(_.name).toSet == Set("default"))
createDatabase("my_db1")
createDatabase("my_db2")
assert(spark.catalog.listDatabases().collect().map(_.name).toSet ==
Set("default", "my_db1", "my_db2"))
assert(spark.catalog.listDatabases("my*").collect().map(_.name).toSet ==
Set("my_db1", "my_db2"))
assert(spark.catalog.listDatabases("you*").collect().map(_.name).toSet ==
Set.empty[String])
dropDatabase("my_db1")
assert(spark.catalog.listDatabases().collect().map(_.name).toSet ==
Set("default", "my_db2"))
}
test("list databases with current catalog") {
spark.catalog.setCurrentCatalog("testcat")
sql(s"CREATE NAMESPACE testcat.my_db")
sql(s"CREATE NAMESPACE testcat.my_db2")
assert(spark.catalog.listDatabases().collect().map(_.name).toSet == Set("my_db", "my_db2"))
}
test("list tables") {
assert(spark.catalog.listTables().collect().isEmpty)
createTable("my_table1")
createTable("my_table2")
createTempTable("my_temp_table")
assert(spark.catalog.listTables().collect().map(_.name).toSet ==
Set("my_table1", "my_table2", "my_temp_table"))
dropTable("my_table1")
assert(spark.catalog.listTables().collect().map(_.name).toSet ==
Set("my_table2", "my_temp_table"))
dropTable("my_temp_table")
assert(spark.catalog.listTables().collect().map(_.name).toSet == Set("my_table2"))
}
test("SPARK-39828: Catalog.listTables() should respect currentCatalog") {
assert(spark.catalog.currentCatalog() == "spark_catalog")
assert(spark.catalog.listTables().collect().isEmpty)
createTable("my_table1")
assert(spark.catalog.listTables().collect().map(_.name).toSet == Set("my_table1"))
val catalogName = "testcat"
val dbName = "my_db"
val tableName = "my_table2"
val tableSchema = new StructType().add("i", "int")
val description = "this is a test managed table"
sql(s"CREATE NAMESPACE $catalogName.$dbName")
spark.catalog.setCurrentCatalog("testcat")
spark.catalog.setCurrentDatabase("my_db")
assert(spark.catalog.listTables().collect().isEmpty)
createTable(tableName, dbName, catalogName, classOf[FakeV2Provider].getName, tableSchema,
Map.empty[String, String], description)
assert(spark.catalog.listTables()
.collect()
.map(t => Array(t.catalog, t.namespace.mkString("."), t.name).mkString(".")).toSet ==
Set("testcat.my_db.my_table2"))
}
test("list tables with database") {
assert(spark.catalog.listTables("default").collect().isEmpty)
createDatabase("my_db1")
createDatabase("my_db2")
createTable("my_table1", Some("my_db1"))
createTable("my_table2", Some("my_db2"))
createTempTable("my_temp_table")
assert(spark.catalog.listTables("default").collect().map(_.name).toSet ==
Set("my_temp_table"))
assert(spark.catalog.listTables("my_db1").collect().map(_.name).toSet ==
Set("my_table1", "my_temp_table"))
assert(spark.catalog.listTables("my_db2").collect().map(_.name).toSet ==
Set("my_table2", "my_temp_table"))
dropTable("my_table1", Some("my_db1"))
assert(spark.catalog.listTables("my_db1").collect().map(_.name).toSet ==
Set("my_temp_table"))
assert(spark.catalog.listTables("my_db2").collect().map(_.name).toSet ==
Set("my_table2", "my_temp_table"))
dropTable("my_temp_table")
assert(spark.catalog.listTables("default").collect().map(_.name).isEmpty)
assert(spark.catalog.listTables("my_db1").collect().map(_.name).isEmpty)
assert(spark.catalog.listTables("my_db2").collect().map(_.name).toSet ==
Set("my_table2"))
val e = intercept[AnalysisException] {
spark.catalog.listTables("unknown_db")
}
assert(e.getMessage.contains("unknown_db"))
}
test("list functions") {
assert(Set("+", "current_database", "window").subsetOf(
spark.catalog.listFunctions().collect().map(_.name).toSet))
createFunction("my_func1")
createFunction("my_func2")
createTempFunction("my_temp_func")
val funcNames1 = spark.catalog.listFunctions().collect().map(_.name).toSet
assert(funcNames1.contains("my_func1"))
assert(funcNames1.contains("my_func2"))
assert(funcNames1.contains("my_temp_func"))
dropFunction("my_func1")
dropTempFunction("my_temp_func")
val funcNames2 = spark.catalog.listFunctions().collect().map(_.name).toSet
assert(!funcNames2.contains("my_func1"))
assert(funcNames2.contains("my_func2"))
assert(!funcNames2.contains("my_temp_func"))
}
test("SPARK-39828: Catalog.listFunctions() should respect currentCatalog") {
assert(spark.catalog.currentCatalog() == "spark_catalog")
assert(Set("+", "current_database", "window").subsetOf(
spark.catalog.listFunctions().collect().map(_.name).toSet))
createFunction("my_func")
assert(spark.catalog.listFunctions().collect().map(_.name).contains("my_func"))
sql(s"CREATE NAMESPACE testcat.ns")
spark.catalog.setCurrentCatalog("testcat")
spark.catalog.setCurrentDatabase("ns")
val funcCatalog = spark.sessionState.catalogManager.catalog("testcat")
.asInstanceOf[InMemoryCatalog]
val function: UnboundFunction = new UnboundFunction {
override def bind(inputType: StructType): BoundFunction = new ScalarFunction[Int] {
override def inputTypes(): Array[DataType] = Array(IntegerType)
override def resultType(): DataType = IntegerType
override def name(): String = "my_bound_function"
}
override def description(): String = "my_function"
override def name(): String = "my_function"
}
assert(!spark.catalog.listFunctions().collect().map(_.name).contains("my_func"))
funcCatalog.createFunction(Identifier.of(Array("ns"), "my_func"), function)
assert(spark.catalog.listFunctions().collect().map(_.name).contains("my_func"))
}
test("list functions with database") {
assert(Set("+", "current_database", "window").subsetOf(
spark.catalog.listFunctions().collect().map(_.name).toSet))
createDatabase("my_db1")
createDatabase("my_db2")
createFunction("my_func1", Some("my_db1"))
createFunction("my_func2", Some("my_db2"))
createTempFunction("my_temp_func")
val funcNames1 = spark.catalog.listFunctions("my_db1").collect().map(_.name).toSet
val funcNames2 = spark.catalog.listFunctions("my_db2").collect().map(_.name).toSet
assert(funcNames1.contains("my_func1"))
assert(!funcNames1.contains("my_func2"))
assert(funcNames1.contains("my_temp_func"))
assert(!funcNames2.contains("my_func1"))
assert(funcNames2.contains("my_func2"))
assert(funcNames2.contains("my_temp_func"))
// Make sure database is set properly.
assert(
spark.catalog.listFunctions("my_db1").collect().map(_.database).toSet == Set("my_db1", null))
assert(
spark.catalog.listFunctions("my_db2").collect().map(_.database).toSet == Set("my_db2", null))
// Remove the function and make sure they no longer appear.
dropFunction("my_func1", Some("my_db1"))
dropTempFunction("my_temp_func")
val funcNames1b = spark.catalog.listFunctions("my_db1").collect().map(_.name).toSet
val funcNames2b = spark.catalog.listFunctions("my_db2").collect().map(_.name).toSet
assert(!funcNames1b.contains("my_func1"))
assert(!funcNames1b.contains("my_temp_func"))
assert(funcNames2b.contains("my_func2"))
assert(!funcNames2b.contains("my_temp_func"))
val e = intercept[AnalysisException] {
spark.catalog.listFunctions("unknown_db")
}
assert(e.getMessage.contains("unknown_db"))
}
test("list columns") {
createTable("tab1")
testListColumns("tab1", dbName = None)
}
test("list columns in temporary table") {
createTempTable("temp1")
spark.catalog.listColumns("temp1")
}
test("list columns in database") {
createDatabase("db1")
createTable("tab1", Some("db1"))
testListColumns("tab1", dbName = Some("db1"))
}
test("SPARK-39615: qualified name with catalog - listColumns") {
val answers = Map(
"col1" -> ("int", true, false, true),
"col2" -> ("string", true, false, false),
"a" -> ("int", true, true, false),
"b" -> ("string", true, true, false)
)
assert(spark.catalog.currentCatalog() === "spark_catalog")
createTable("my_table1")
val columns1 = spark.catalog.listColumns("my_table1").collect()
assert(answers ===
columns1.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
val columns2 = spark.catalog.listColumns("default.my_table1").collect()
assert(answers ===
columns2.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
val columns3 = spark.catalog.listColumns("spark_catalog.default.my_table1").collect()
assert(answers ===
columns3.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
createDatabase("my_db1")
createTable("my_table2", Some("my_db1"))
val columns4 = spark.catalog.listColumns("my_db1.my_table2").collect()
assert(answers ===
columns4.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
val columns5 = spark.catalog.listColumns("spark_catalog.my_db1.my_table2").collect()
assert(answers ===
columns5.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap)
val catalogName = "testcat"
val dbName = "my_db2"
val tableName = "my_table2"
val tableSchema = new StructType().add("i", "int").add("j", "string")
val description = "this is a test managed table"
createTable(tableName, dbName, catalogName, classOf[FakeV2Provider].getName, tableSchema,
Map.empty[String, String], description)
val columns6 = spark.catalog.listColumns("testcat.my_db2.my_table2").collect()
assert(Map("i" -> "int", "j" -> "string") === columns6.map(c => c.name -> c.dataType).toMap)
}
test("Database.toString") {
assert(new Database("cool_db", "cool_desc", "cool_path").toString ==
"Database[name='cool_db', description='cool_desc', path='cool_path']")
assert(new Database("cool_db", null, "cool_path").toString ==
"Database[name='cool_db', path='cool_path']")
}
test("Table.toString") {
assert(new Table("volley", null, Array("databasa"), "one", "world", isTemporary = true).toString
== "Table[name='volley', database='databasa', description='one', " +
"tableType='world', isTemporary='true']")
assert(new Table("volley", null, null, "world", isTemporary = true).toString ==
"Table[name='volley', tableType='world', isTemporary='true']")
}
test("Function.toString") {
assert(
new Function("nama", "databasa", "commenta", "classNameAh", isTemporary = true).toString ==
"Function[name='nama', database='databasa', description='commenta', " +
"className='classNameAh', isTemporary='true']")
assert(new Function("nama", null, null, "classNameAh", isTemporary = false).toString ==
"Function[name='nama', className='classNameAh', isTemporary='false']")
}
test("Column.toString") {
assert(new Column("namama", "descaca", "datatapa",
nullable = true, isPartition = false, isBucket = true).toString ==
"Column[name='namama', description='descaca', dataType='datatapa', " +
"nullable='true', isPartition='false', isBucket='true']")
assert(new Column("namama", null, "datatapa",
nullable = false, isPartition = true, isBucket = true).toString ==
"Column[name='namama', dataType='datatapa', " +
"nullable='false', isPartition='true', isBucket='true']")
}
test("catalog classes format in Dataset.show") {
val db = new Database("nama", "cataloa", "descripta", "locata")
val table = new Table("nama", "cataloa", Array("databasa"), "descripta", "typa",
isTemporary = false)
val function = new Function("nama", "cataloa", Array("databasa"), "descripta", "classa", false)
val column = new Column(
"nama", "descripta", "typa", nullable = false, isPartition = true, isBucket = true)
val dbFields = ScalaReflection.getConstructorParameterValues(db)
val tableFields = ScalaReflection.getConstructorParameterValues(table)
val functionFields = ScalaReflection.getConstructorParameterValues(function)
val columnFields = ScalaReflection.getConstructorParameterValues(column)
assert(dbFields == Seq("nama", "cataloa", "descripta", "locata"))
assert(Seq(tableFields(0), tableFields(1), tableFields(3), tableFields(4), tableFields(5)) ==
Seq("nama", "cataloa", "descripta", "typa", false))
assert(tableFields(2).asInstanceOf[Array[String]].sameElements(Array("databasa")))
assert((functionFields(0), functionFields(1), functionFields(3), functionFields(4),
functionFields(5)) == ("nama", "cataloa", "descripta", "classa", false))
assert(functionFields(2).asInstanceOf[Array[String]].sameElements(Array("databasa")))
assert(columnFields == Seq("nama", "descripta", "typa", false, true, true))
val dbString = CatalogImpl.makeDataset(Seq(db), spark).showString(10)
val tableString = CatalogImpl.makeDataset(Seq(table), spark).showString(10)
val functionString = CatalogImpl.makeDataset(Seq(function), spark).showString(10)
val columnString = CatalogImpl.makeDataset(Seq(column), spark).showString(10)
dbFields.foreach { f => assert(dbString.contains(f.toString)) }
tableFields.foreach { f => assert(tableString.contains(f.toString) ||
tableString.contains(f.asInstanceOf[Array[String]].mkString(""))) }
functionFields.foreach { f => assert(functionString.contains(f.toString) ||
functionString.contains(f.asInstanceOf[Array[String]].mkString(""))) }
columnFields.foreach { f => assert(columnString.contains(f.toString)) }
}
test("dropTempView should not un-cache and drop metastore table if a same-name table exists") {
withTable("same_name") {
spark.range(10).write.saveAsTable("same_name")
sql("CACHE TABLE same_name")
assert(spark.catalog.isCached("default.same_name"))
spark.catalog.dropTempView("same_name")
assert(spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default"))))
assert(spark.catalog.isCached("default.same_name"))
}
}
test("get database") {
intercept[AnalysisException](spark.catalog.getDatabase("db10"))
withTempDatabase { db =>
assert(spark.catalog.getDatabase(db).name === db)
}
}
test("get table") {
withTempDatabase { db =>
withTable(s"tbl_x", s"$db.tbl_y") {
// Try to find non existing tables.
intercept[AnalysisException](spark.catalog.getTable("tbl_x"))
intercept[AnalysisException](spark.catalog.getTable("tbl_y"))
intercept[AnalysisException](spark.catalog.getTable(db, "tbl_y"))
// Create objects.
createTempTable("tbl_x")
createTable("tbl_y", Some(db))
// Find a temporary table
assert(spark.catalog.getTable("tbl_x").name === "tbl_x")
// Find a qualified table
assert(spark.catalog.getTable(db, "tbl_y").name === "tbl_y")
assert(spark.catalog.getTable(s"$db.tbl_y").name === "tbl_y")
// Find an unqualified table using the current database
intercept[AnalysisException](spark.catalog.getTable("tbl_y"))
spark.catalog.setCurrentDatabase(db)
assert(spark.catalog.getTable("tbl_y").name === "tbl_y")
}
}
}
test("get function") {
withTempDatabase { db =>
withUserDefinedFunction("fn1" -> true, s"$db.fn2" -> false) {
// Try to find non existing functions.
intercept[AnalysisException](spark.catalog.getFunction("fn1"))
intercept[AnalysisException](spark.catalog.getFunction(db, "fn1"))
intercept[AnalysisException](spark.catalog.getFunction("fn2"))
intercept[AnalysisException](spark.catalog.getFunction(db, "fn2"))
// Create objects.
createTempFunction("fn1")
createFunction("fn2", Some(db))
// Find a temporary function
val fn1 = spark.catalog.getFunction("fn1")
assert(fn1.name === "fn1")
assert(fn1.database === null)
assert(fn1.isTemporary)
// Find a temporary function with database
intercept[AnalysisException](spark.catalog.getFunction(db, "fn1"))
// Find a qualified function
val fn2 = spark.catalog.getFunction(db, "fn2")
assert(fn2.name === "fn2")
assert(fn2.database === db)
assert(!fn2.isTemporary)
val fn2WithQualifiedName = spark.catalog.getFunction(s"$db.fn2")
assert(fn2WithQualifiedName.name === "fn2")
assert(fn2WithQualifiedName.database === db)
assert(!fn2WithQualifiedName.isTemporary)
// Find an unqualified function using the current database
intercept[AnalysisException](spark.catalog.getFunction("fn2"))
spark.catalog.setCurrentDatabase(db)
val unqualified = spark.catalog.getFunction("fn2")
assert(unqualified.name === "fn2")
assert(unqualified.database === db)
assert(!unqualified.isTemporary)
}
}
}
test("database exists") {
assert(!spark.catalog.databaseExists("db10"))
createDatabase("db10")
assert(spark.catalog.databaseExists("db10"))
dropDatabase("db10")
}
test("table exists") {
withTempDatabase { db =>
withTable(s"tbl_x", s"$db.tbl_y") {
// Try to find non existing tables.
assert(!spark.catalog.tableExists("tbl_x"))
assert(!spark.catalog.tableExists("tbl_y"))
assert(!spark.catalog.tableExists(db, "tbl_y"))
assert(!spark.catalog.tableExists(s"$db.tbl_y"))
// Create objects.
createTempTable("tbl_x")
createTable("tbl_y", Some(db))
// Find a temporary table
assert(spark.catalog.tableExists("tbl_x"))
// Find a qualified table
assert(spark.catalog.tableExists(db, "tbl_y"))
assert(spark.catalog.tableExists(s"$db.tbl_y"))
// Find an unqualified table using the current database
assert(!spark.catalog.tableExists("tbl_y"))
spark.catalog.setCurrentDatabase(db)
assert(spark.catalog.tableExists("tbl_y"))
// Unable to find the table, although the temp view with the given name exists
assert(!spark.catalog.tableExists(db, "tbl_x"))
}
}
}
test("function exists") {
withTempDatabase { db =>
withUserDefinedFunction("fn1" -> true, s"$db.fn2" -> false) {
// Try to find non existing functions.
assert(!spark.catalog.functionExists("fn1"))
assert(!spark.catalog.functionExists("fn2"))
assert(!spark.catalog.functionExists(db, "fn2"))
assert(!spark.catalog.functionExists(s"$db.fn2"))
// Create objects.
createTempFunction("fn1")
createFunction("fn2", Some(db))
// Find a temporary function
assert(spark.catalog.functionExists("fn1"))
assert(!spark.catalog.functionExists(db, "fn1"))
// Find a qualified function
assert(spark.catalog.functionExists(db, "fn2"))
assert(spark.catalog.functionExists(s"$db.fn2"))
// Find an unqualified function using the current database
assert(!spark.catalog.functionExists("fn2"))
spark.catalog.setCurrentDatabase(db)
assert(spark.catalog.functionExists("fn2"))
// Unable to find the function, although the temp function with the given name exists
assert(!spark.catalog.functionExists(db, "fn1"))
}
}
}
test("createTable with 'path' in options") {
val description = "this is a test table"
withTable("t") {
withTempDir { dir =>
spark.catalog.createTable(
tableName = "t",
source = "json",
schema = new StructType().add("i", "int"),
description = description,
options = Map("path" -> dir.getAbsolutePath))
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.tableType == CatalogTableType.EXTERNAL)
assert(table.storage.locationUri.get == makeQualifiedPath(dir.getAbsolutePath))
assert(table.comment == Some(description))
Seq((1)).toDF("i").write.insertInto("t")
assert(dir.exists() && dir.listFiles().nonEmpty)
sql("DROP TABLE t")
// the table path and data files are still there after DROP TABLE, if custom table path is
// specified.
assert(dir.exists() && dir.listFiles().nonEmpty)
}
}
}
test("createTable without 'path' in options") {
withTable("t") {
spark.catalog.createTable(
tableName = "t",
source = "json",
schema = new StructType().add("i", "int"),
options = Map.empty[String, String])
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.tableType == CatalogTableType.MANAGED)
val tablePath = new File(table.storage.locationUri.get)
assert(tablePath.exists() && tablePath.listFiles().isEmpty)
Seq((1)).toDF("i").write.insertInto("t")
assert(tablePath.listFiles().nonEmpty)
sql("DROP TABLE t")
// the table path is removed after DROP TABLE, if custom table path is not specified.
assert(!tablePath.exists())
}
}
test("clone Catalog") {
// need to test tempTables are cloned
assert(spark.catalog.listTables().collect().isEmpty)
createTempTable("my_temp_table")
assert(spark.catalog.listTables().collect().map(_.name).toSet == Set("my_temp_table"))
// inheritance
val forkedSession = spark.cloneSession()
assert(spark ne forkedSession)
assert(spark.catalog ne forkedSession.catalog)
assert(forkedSession.catalog.listTables().collect().map(_.name).toSet == Set("my_temp_table"))
// independence
dropTable("my_temp_table") // drop table in original session
assert(spark.catalog.listTables().collect().map(_.name).toSet == Set())
assert(forkedSession.catalog.listTables().collect().map(_.name).toSet == Set("my_temp_table"))
createTempView(
forkedSession.sessionState.catalog, "fork_table", Range(1, 2, 3, 4), overrideIfExists = true)
assert(spark.catalog.listTables().collect().map(_.name).toSet == Set())
}
test("cacheTable with storage level") {
createTempTable("my_temp_table")
spark.catalog.cacheTable("my_temp_table", StorageLevel.DISK_ONLY)
assert(spark.table("my_temp_table").storageLevel == StorageLevel.DISK_ONLY)
}
test("SPARK-34301: recover partitions of views is not supported") {
createTempTable("my_temp_table")
val errMsg = intercept[AnalysisException] {
spark.catalog.recoverPartitions("my_temp_table")
}.getMessage
assert(errMsg.contains("my_temp_table is a temp view. 'recoverPartitions()' expects a table"))
}
test("qualified name with catalog - create managed table") {
val catalogName = "testcat"
val dbName = "my_db"
val tableName = "my_table"
val tableSchema = new StructType().add("i", "int")
val description = "this is a test table"
val df = createTable(tableName, dbName, catalogName, classOf[FakeV2Provider].getName,
tableSchema, Map.empty[String, String], description)
assert(df.schema.equals(tableSchema))
val testCatalog =
spark.sessionState.catalogManager.catalog(catalogName).asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(dbName), tableName))
assert(table.schema().equals(tableSchema))
assert(table.properties().get("provider").equals(classOf[FakeV2Provider].getName))
assert(table.properties().get("comment").equals(description))
}
test("qualified name with catalog - create external table") {
withTempDir { dir =>
val catalogName = "testcat"
val dbName = "my_db"
val tableName = "my_table"
val tableSchema = new StructType().add("i", "int")
val description = "this is a test table"
val df = createTable(tableName, dbName, catalogName, classOf[FakeV2Provider].getName,
tableSchema, Map("path" -> dir.getAbsolutePath), description)
assert(df.schema.equals(tableSchema))
val testCatalog =
spark.sessionState.catalogManager.catalog("testcat").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(dbName), tableName))
assert(table.schema().equals(tableSchema))
assert(table.properties().get("provider").equals(classOf[FakeV2Provider].getName))
assert(table.properties().get("comment").equals(description))
assert(table.properties().get("path").equals(dir.getAbsolutePath))
assert(table.properties().get("external").equals("true"))
assert(table.properties().get("location").equals("file:" + dir.getAbsolutePath))
}
}
test("qualified name with catalog - list tables") {
withTempDir { dir =>
val catalogName = "testcat"
val dbName = "my_db"
val tableName = "my_table"
val tableSchema = new StructType().add("i", "int")
val description = "this is a test managed table"
createTable(tableName, dbName, catalogName, classOf[FakeV2Provider].getName, tableSchema,
Map.empty[String, String], description)
val tableName2 = "my_table2"
val description2 = "this is a test external table"
createTable(tableName2, dbName, catalogName, classOf[FakeV2Provider].getName, tableSchema,
Map("path" -> dir.getAbsolutePath), description2)
val tables = spark.catalog.listTables("testcat.my_db").collect()
assert(tables.size == 2)
val expectedTable1 =
new Table(tableName, catalogName, Array(dbName), description,
CatalogTableType.MANAGED.name, false)
assert(tables.exists(t =>
expectedTable1.name.equals(t.name) && expectedTable1.database.equals(t.database) &&
expectedTable1.description.equals(t.description) &&
expectedTable1.tableType.equals(t.tableType) &&
expectedTable1.isTemporary == t.isTemporary))
val expectedTable2 =
new Table(tableName2, catalogName, Array(dbName), description2,
CatalogTableType.EXTERNAL.name, false)
assert(tables.exists(t =>
expectedTable2.name.equals(t.name) && expectedTable2.database.equals(t.database) &&
expectedTable2.description.equals(t.description) &&
expectedTable2.tableType.equals(t.tableType) &&
expectedTable2.isTemporary == t.isTemporary))
}
}
test("list tables when there is `default` catalog") {
spark.conf.set("spark.sql.catalog.default", classOf[InMemoryCatalog].getName)
assert(spark.catalog.listTables("default").collect().isEmpty)
createTable("my_table1")
createTable("my_table2")
createTempTable("my_temp_table")
assert(spark.catalog.listTables("default").collect().map(_.name).toSet ==
Set("my_table1", "my_table2", "my_temp_table"))
}
test("qualified name with catalog - get table") {
val catalogName = "testcat"
val dbName = "default"
val tableName = "my_table"
val tableSchema = new StructType().add("i", "int")
val description = "this is a test table"
createTable(tableName, dbName, catalogName, classOf[FakeV2Provider].getName, tableSchema,
Map.empty[String, String], description)
val t = spark.catalog.getTable(Array(catalogName, dbName, tableName).mkString("."))
val expectedTable =
new Table(
tableName,
catalogName,
Array(dbName),
description,
CatalogTableType.MANAGED.name,
false)
assert(expectedTable.toString == t.toString)
// test when both sessionCatalog and testcat contains tables with same name, and we expect
// the table in sessionCatalog is returned when use 2 part name.
createTable("my_table")
val t2 = spark.catalog.getTable(Array(dbName, tableName).mkString("."))
assert(t2.catalog == CatalogManager.SESSION_CATALOG_NAME)
}
test("qualified name with catalog - table exists") {
val catalogName = "testcat"
val dbName = "my_db"
val tableName = "my_table"
val tableSchema = new StructType().add("i", "int")
assert(!spark.catalog.tableExists(Array(catalogName, dbName, tableName).mkString(".")))
createTable(tableName, dbName, catalogName, classOf[FakeV2Provider].getName, tableSchema,
Map.empty[String, String], "")
assert(spark.catalog.tableExists(Array(catalogName, dbName, tableName).mkString(".")))
}
test("SPARK-39810: Catalog.tableExists should handle nested namespace") {
val tableSchema = new StructType().add("i", "int")
val catalogName = "testcat"
val dbName = "my_db2.my_db3"
val tableName = "my_table2"
assert(!spark.catalog.tableExists(Array(catalogName, dbName, tableName).mkString(".")))
createTable(tableName, dbName, catalogName, classOf[FakeV2Provider].getName, tableSchema,
Map.empty[String, String], "")
assert(spark.catalog.tableExists(Array(catalogName, dbName, tableName).mkString(".")))
}
test("qualified name with catalog - database exists") {
val catalogName = "testcat"
val dbName = "my_db"
assert(!spark.catalog.databaseExists(Array(catalogName, dbName).mkString(".")))
sql(s"CREATE NAMESPACE ${catalogName}.${dbName}")
assert(spark.catalog.databaseExists(Array(catalogName, dbName).mkString(".")))
val catalogName2 = "catalog_not_exists"
assert(!spark.catalog.databaseExists(Array(catalogName2, dbName).mkString(".")))
}
test("SPARK-39506: qualified name with catalog - cache table, isCached and" +
"uncacheTable") {
val tableSchema = new StructType().add("i", "int")
createTable("my_table", "my_db", "testcat", classOf[FakeV2Provider].getName,
tableSchema, Map.empty[String, String], "")
createTable("my_table2", "my_db", "testcat", classOf[FakeV2Provider].getName,
tableSchema, Map.empty[String, String], "")
spark.catalog.cacheTable("testcat.my_db.my_table", StorageLevel.DISK_ONLY)
assert(spark.table("testcat.my_db.my_table").storageLevel == StorageLevel.DISK_ONLY)
assert(spark.catalog.isCached("testcat.my_db.my_table"))
spark.catalog.cacheTable("testcat.my_db.my_table2")
assert(spark.catalog.isCached("testcat.my_db.my_table2"))
spark.catalog.uncacheTable("testcat.my_db.my_table")
assert(!spark.catalog.isCached("testcat.my_db.my_table"))
}
test("SPARK-39506: test setCurrentCatalog, currentCatalog and listCatalogs") {
assert(spark.catalog.listCatalogs().collect().map(c => c.name).toSet ==
Set(CatalogManager.SESSION_CATALOG_NAME))
spark.catalog.setCurrentCatalog("testcat")
assert(spark.catalog.currentCatalog().equals("testcat"))
spark.catalog.setCurrentCatalog("spark_catalog")
assert(spark.catalog.currentCatalog().equals("spark_catalog"))
assert(spark.catalog.listCatalogs().collect().map(c => c.name).toSet ==
Set("testcat", CatalogManager.SESSION_CATALOG_NAME))
assert(spark.catalog.listCatalogs("spark*").collect().map(c => c.name).toSet ==
Set(CatalogManager.SESSION_CATALOG_NAME))
assert(spark.catalog.listCatalogs("spark2*").collect().map(c => c.name).toSet ==
Set.empty)
}
test("SPARK-39583: Make RefreshTable be compatible with 3 layer namespace") {
withTempDir { dir =>
val tableName = "spark_catalog.default.my_table"
sql(s"""
| CREATE TABLE ${tableName}(col STRING) USING TEXT
| LOCATION '${dir.getAbsolutePath}'
|""".stripMargin)
sql(s"""INSERT INTO ${tableName} SELECT 'abc'""".stripMargin)
spark.catalog.cacheTable(tableName)
assert(spark.table(tableName).collect().length == 1)
FileUtils.deleteDirectory(dir)
assert(spark.table(tableName).collect().length == 1)
spark.catalog.refreshTable(tableName)
assert(spark.table(tableName).collect().length == 0)
}
}
test("qualified name with catalogy - get database") {
val catalogsAndDatabases =
Seq(("testcat", "somedb"), ("testcat", "ns.somedb"), ("spark_catalog", "somedb"))
catalogsAndDatabases.foreach { case (catalog, dbName) =>
val qualifiedDb = s"$catalog.$dbName"
sql(s"CREATE NAMESPACE $qualifiedDb COMMENT '$qualifiedDb' LOCATION '/test/location'")
val db = spark.catalog.getDatabase(qualifiedDb)
assert(db.name === dbName)
assert(db.description === qualifiedDb)
assert(db.locationUri === "file:/test/location")
}
// test without qualifier
val name = "testns"
sql(s"CREATE NAMESPACE testcat.$name COMMENT '$name'")
spark.catalog.setCurrentCatalog("testcat")
val db = spark.catalog.getDatabase(name)
assert(db.name === name)
assert(db.description === name)
intercept[AnalysisException](spark.catalog.getDatabase("randomcat.db10"))
}
test("qualified name with catalog - get database, same in hive and testcat") {
// create 'testdb' in hive and testcat
val dbName = "testdb"
sql(s"CREATE NAMESPACE spark_catalog.$dbName COMMENT 'hive database'")
sql(s"CREATE NAMESPACE testcat.$dbName COMMENT 'testcat namespace'")
sql("SET CATALOG testcat")
// should still return the database in Hive
val db = spark.catalog.getDatabase(dbName)
assert(db.name === dbName)
assert(db.description === "hive database")
}
test("get database when there is `default` catalog") {
spark.conf.set("spark.sql.catalog.default", classOf[InMemoryCatalog].getName)
val db = "testdb"
val qualified = s"default.$db"
sql(s"CREATE NAMESPACE $qualified")
assert(spark.catalog.getDatabase(qualified).name === db)
}
test("qualified name with catalog - set current database") {
spark.catalog.setCurrentCatalog("testcat")
// namespace with the same name as catalog
sql("CREATE NAMESPACE testcat.testcat.my_db")
spark.catalog.setCurrentDatabase("testcat.my_db")
assert(spark.catalog.currentDatabase == "testcat.my_db")
// sessionCatalog still reports 'default' as current database
assert(sessionCatalog.getCurrentDatabase == "default")
val e = intercept[AnalysisException] {
spark.catalog.setCurrentDatabase("my_db")
}.getMessage
assert(e.contains("my_db"))
// check that we can fall back to old sessionCatalog
createDatabase("hive_db")
val err = intercept[AnalysisException] {
spark.catalog.setCurrentDatabase("hive_db")
}.getMessage
assert(err.contains("hive_db"))
spark.catalog.setCurrentCatalog("spark_catalog")
spark.catalog.setCurrentDatabase("hive_db")
assert(spark.catalog.currentDatabase == "hive_db")
assert(sessionCatalog.getCurrentDatabase == "hive_db")
val e3 = intercept[AnalysisException] {
spark.catalog.setCurrentDatabase("unknown_db")
}.getMessage
assert(e3.contains("unknown_db"))
}
test("SPARK-39579: qualified name with catalog - listFunctions, getFunction, functionExists") {
createDatabase("my_db1")
createFunction("my_func1", Some("my_db1"))
val functions1a = spark.catalog.listFunctions("my_db1").collect().map(_.name)
val functions1b = spark.catalog.listFunctions("spark_catalog.my_db1").collect().map(_.name)
assert(functions1a.length > 200 && functions1a.contains("my_func1"))
assert(functions1b.length > 200 && functions1b.contains("my_func1"))
// functions1b contains 5 more functions: [<>, ||, !=, case, between]
assert(functions1a.intersect(functions1b) === functions1a)
assert(spark.catalog.functionExists("my_db1.my_func1"))
assert(spark.catalog.functionExists("spark_catalog.my_db1.my_func1"))
val func1a = spark.catalog.getFunction("my_db1.my_func1")
val func1b = spark.catalog.getFunction("spark_catalog.my_db1.my_func1")
assert(func1a.name === func1b.name && func1a.namespace === func1b.namespace &&
func1a.className === func1b.className && func1a.isTemporary === func1b.isTemporary)
assert(func1a.catalog === "spark_catalog" && func1b.catalog === "spark_catalog")
assert(func1a.description === "N/A." && func1b.description === "N/A.")
val function: UnboundFunction = new UnboundFunction {
override def bind(inputType: StructType): BoundFunction = new ScalarFunction[Int] {
override def inputTypes(): Array[DataType] = Array(IntegerType)
override def resultType(): DataType = IntegerType
override def name(): String = "my_bound_function"
}
override def description(): String = "hello"
override def name(): String = "my_function"
}
val testCatalog: InMemoryCatalog =
spark.sessionState.catalogManager.catalog("testcat").asInstanceOf[InMemoryCatalog]
testCatalog.createFunction(Identifier.of(Array("my_db2"), "my_func2"), function)
val functions2 = spark.catalog.listFunctions("testcat.my_db2").collect().map(_.name)
assert(functions2.length > 200 && functions2.contains("my_func2"))
assert(spark.catalog.functionExists("testcat.my_db2.my_func2"))
assert(!spark.catalog.functionExists("testcat.my_db2.my_func3"))
val func2 = spark.catalog.getFunction("testcat.my_db2.my_func2")
assert(func2.name === "my_func2" && func2.namespace === Array("my_db2") &&
func2.catalog === "testcat" && func2.description === "hello" &&
func2.isTemporary === false &&
func2.className.startsWith("org.apache.spark.sql.internal.CatalogSuite"))
}
}