blob: 51316b464ab34a1e1a13973bdb36c1cc28b4f2cc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2.jdbc
import java.sql.{Connection, DriverManager}
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
val tempDir = Utils.createTempDir()
val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass"
var conn: java.sql.Connection = null
override def sparkConf: SparkConf = super.sparkConf
.set("spark.sql.catalog.h2", classOf[JDBCTableCatalog].getName)
.set("spark.sql.catalog.h2.url", url)
.set("spark.sql.catalog.h2.driver", "org.h2.Driver")
private def withConnection[T](f: Connection => T): T = {
val conn = DriverManager.getConnection(url, new Properties())
try {
f(conn)
} finally {
conn.close()
}
}
override def beforeAll(): Unit = {
super.beforeAll()
Utils.classForName("org.h2.Driver")
withConnection { conn =>
conn.prepareStatement("""CREATE SCHEMA "test"""").executeUpdate()
conn.prepareStatement(
"""CREATE TABLE "test"."people" (name TEXT(32) NOT NULL, id INTEGER NOT NULL)""")
.executeUpdate()
}
}
override def afterAll(): Unit = {
Utils.deleteRecursively(tempDir)
super.afterAll()
}
test("show tables") {
checkAnswer(sql("SHOW TABLES IN h2.test"), Seq(Row("test", "people")))
// Check not existing namespace
checkAnswer(sql("SHOW TABLES IN h2.bad_test"), Seq())
}
test("drop a table and test whether the table exists") {
withConnection { conn =>
conn.prepareStatement("""CREATE TABLE "test"."to_drop" (id INTEGER)""").executeUpdate()
}
checkAnswer(sql("SHOW TABLES IN h2.test"), Seq(Row("test", "to_drop"), Row("test", "people")))
sql("DROP TABLE h2.test.to_drop")
checkAnswer(sql("SHOW TABLES IN h2.test"), Seq(Row("test", "people")))
Seq(
"h2.test.not_existing_table" -> "Table or view not found: h2.test.not_existing_table",
"h2.bad_test.not_existing_table" -> "Table or view not found: h2.bad_test.not_existing_table"
).foreach { case (table, expectedMsg) =>
val msg = intercept[AnalysisException] {
sql(s"DROP TABLE $table")
}.getMessage
assert(msg.contains(expectedMsg))
}
}
test("rename a table") {
withTable("h2.test.dst_table") {
withConnection { conn =>
conn.prepareStatement("""CREATE TABLE "test"."src_table" (id INTEGER)""").executeUpdate()
}
checkAnswer(
sql("SHOW TABLES IN h2.test"),
Seq(Row("test", "src_table"), Row("test", "people")))
sql("ALTER TABLE h2.test.src_table RENAME TO test.dst_table")
checkAnswer(
sql("SHOW TABLES IN h2.test"),
Seq(Row("test", "dst_table"), Row("test", "people")))
}
// Rename not existing table or namespace
val exp1 = intercept[NoSuchTableException] {
sql(s"ALTER TABLE h2.test.not_existing_table RENAME TO test.dst_table")
}
assert(exp1.getMessage.contains(
"Failed table renaming from test.not_existing_table to test.dst_table"))
assert(exp1.cause.get.getMessage.contains("Table \"not_existing_table\" not found"))
val exp2 = intercept[NoSuchNamespaceException] {
sql(s"ALTER TABLE h2.bad_test.not_existing_table RENAME TO test.dst_table")
}
assert(exp2.getMessage.contains(
"Failed table renaming from bad_test.not_existing_table to test.dst_table"))
assert(exp2.cause.get.getMessage.contains("Schema \"bad_test\" not found"))
// Rename to an existing table
withTable("h2.test.dst_table") {
withConnection { conn =>
conn.prepareStatement("""CREATE TABLE "test"."dst_table" (id INTEGER)""").executeUpdate()
}
withTable("h2.test.src_table") {
withConnection { conn =>
conn.prepareStatement("""CREATE TABLE "test"."src_table" (id INTEGER)""").executeUpdate()
}
val exp = intercept[TableAlreadyExistsException] {
sql("ALTER TABLE h2.test.src_table RENAME TO test.dst_table")
}
assert(exp.getMessage.contains(
"Failed table renaming from test.src_table to test.dst_table"))
assert(exp.cause.get.getMessage.contains("Table \"dst_table\" already exists"))
}
}
}
test("load a table") {
val t = spark.table("h2.test.people")
val expectedSchema = new StructType()
.add("NAME", StringType)
.add("ID", IntegerType)
assert(t.schema === expectedSchema)
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
val msg = intercept[AnalysisException] {
spark.table(table).schema
}.getMessage
assert(msg.contains("Table or view not found"))
}
}
test("create a table") {
withTable("h2.test.new_table") {
// TODO (SPARK-32427): Omit USING in CREATE TABLE
sql("CREATE TABLE h2.test.new_table(i INT, j STRING) USING _")
checkAnswer(
sql("SHOW TABLES IN h2.test"),
Seq(Row("test", "people"), Row("test", "new_table")))
}
withTable("h2.test.new_table") {
sql("CREATE TABLE h2.test.new_table(i INT, j STRING) USING _")
val msg = intercept[AnalysisException] {
sql("CREATE TABLE h2.test.new_table(i INT, j STRING) USING _")
}.getMessage
assert(msg.contains("Table test.new_table already exists"))
}
val exp = intercept[NoSuchNamespaceException] {
sql("CREATE TABLE h2.bad_test.new_table(i INT, j STRING) USING _")
}
assert(exp.getMessage.contains("Failed table creation: bad_test.new_table"))
assert(exp.cause.get.getMessage.contains("Schema \"bad_test\" not found"))
}
test("ALTER TABLE ... add column") {
val tableName = "h2.test.alt_table"
withTable(tableName) {
sql(s"CREATE TABLE $tableName (ID INTEGER) USING _")
sql(s"ALTER TABLE $tableName ADD COLUMNS (C1 INTEGER, C2 STRING)")
var t = spark.table(tableName)
var expectedSchema = new StructType()
.add("ID", IntegerType)
.add("C1", IntegerType)
.add("C2", StringType)
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $tableName ADD COLUMNS (c3 DOUBLE)")
t = spark.table(tableName)
expectedSchema = expectedSchema.add("c3", DoubleType)
assert(t.schema === expectedSchema)
// Add already existing column
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName ADD COLUMNS (c3 DOUBLE)")
}.getMessage
assert(msg.contains("Cannot add column, because c3 already exists"))
}
// Add a column to not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $table ADD COLUMNS (C4 STRING)")
}.getMessage
assert(msg.contains("Table not found"))
}
}
test("ALTER TABLE ... rename column") {
val tableName = "h2.test.alt_table"
withTable(tableName) {
sql(s"CREATE TABLE $tableName (id INTEGER, C0 INTEGER) USING _")
sql(s"ALTER TABLE $tableName RENAME COLUMN id TO C")
val t = spark.table(tableName)
val expectedSchema = new StructType()
.add("C", IntegerType)
.add("C0", IntegerType)
assert(t.schema === expectedSchema)
// Rename to already existing column
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName RENAME COLUMN C TO C0")
}.getMessage
assert(msg.contains("Cannot rename column, because C0 already exists"))
}
// Rename a column in not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $table RENAME COLUMN ID TO C")
}.getMessage
assert(msg.contains("Table not found"))
}
}
test("ALTER TABLE ... drop column") {
val tableName = "h2.test.alt_table"
withTable(tableName) {
sql(s"CREATE TABLE $tableName (C1 INTEGER, C2 INTEGER, c3 INTEGER) USING _")
sql(s"ALTER TABLE $tableName DROP COLUMN C1")
sql(s"ALTER TABLE $tableName DROP COLUMN c3")
val t = spark.table(tableName)
val expectedSchema = new StructType().add("C2", IntegerType)
assert(t.schema === expectedSchema)
// Drop not existing column
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName DROP COLUMN bad_column")
}.getMessage
assert(msg.contains("Cannot delete missing field bad_column in test.alt_table schema"))
}
// Drop a column to not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $table DROP COLUMN C1")
}.getMessage
assert(msg.contains("Table not found"))
}
}
test("ALTER TABLE ... update column type") {
val tableName = "h2.test.alt_table"
withTable(tableName) {
sql(s"CREATE TABLE $tableName (ID INTEGER, deptno INTEGER) USING _")
sql(s"ALTER TABLE $tableName ALTER COLUMN id TYPE DOUBLE")
sql(s"ALTER TABLE $tableName ALTER COLUMN deptno TYPE DOUBLE")
val t = spark.table(tableName)
val expectedSchema = new StructType().add("ID", DoubleType).add("deptno", DoubleType)
assert(t.schema === expectedSchema)
// Update not existing column
val msg1 = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column TYPE DOUBLE")
}.getMessage
assert(msg1.contains("Cannot update missing field bad_column in test.alt_table schema"))
// Update column to wrong type
val msg2 = intercept[ParseException] {
sql(s"ALTER TABLE $tableName ALTER COLUMN id TYPE bad_type")
}.getMessage
assert(msg2.contains("DataType bad_type is not supported"))
}
// Update column type in not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $table ALTER COLUMN id TYPE DOUBLE")
}.getMessage
assert(msg.contains("Table not found"))
}
}
test("ALTER TABLE ... update column nullability") {
val tableName = "h2.test.alt_table"
withTable(tableName) {
sql(s"CREATE TABLE $tableName (ID INTEGER NOT NULL, deptno INTEGER NOT NULL) USING _")
sql(s"ALTER TABLE $tableName ALTER COLUMN ID DROP NOT NULL")
sql(s"ALTER TABLE $tableName ALTER COLUMN deptno DROP NOT NULL")
val t = spark.table(tableName)
val expectedSchema = new StructType()
.add("ID", IntegerType, nullable = true).add("deptno", IntegerType, nullable = true)
assert(t.schema === expectedSchema)
// Update nullability of not existing column
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column DROP NOT NULL")
}.getMessage
assert(msg.contains("Cannot update missing field bad_column in test.alt_table"))
}
// Update column nullability in not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $table ALTER COLUMN ID DROP NOT NULL")
}.getMessage
assert(msg.contains("Table not found"))
}
}
test("ALTER TABLE ... update column comment not supported") {
val tableName = "h2.test.alt_table"
withTable(tableName) {
sql(s"CREATE TABLE $tableName (ID INTEGER) USING _")
val exp = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName ALTER COLUMN ID COMMENT 'test'")
}
assert(exp.getMessage.contains("Failed table altering: test.alt_table"))
assert(exp.cause.get.getMessage.contains("Unsupported TableChange"))
// Update comment for not existing column
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column COMMENT 'test'")
}.getMessage
assert(msg.contains("Cannot update missing field bad_column in test.alt_table"))
}
// Update column comments in not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $table ALTER COLUMN ID COMMENT 'test'")
}.getMessage
assert(msg.contains("Table not found"))
}
}
test("ALTER TABLE case sensitivity") {
val tableName = "h2.test.alt_table"
withTable(tableName) {
sql(s"CREATE TABLE $tableName (c1 INTEGER NOT NULL, c2 INTEGER) USING _")
var t = spark.table(tableName)
var expectedSchema = new StructType().add("c1", IntegerType).add("c2", IntegerType)
assert(t.schema === expectedSchema)
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName RENAME COLUMN C2 TO c3")
}.getMessage
assert(msg.contains("Cannot rename missing field C2 in test.alt_table schema"))
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql(s"ALTER TABLE $tableName RENAME COLUMN C2 TO c3")
expectedSchema = new StructType().add("c1", IntegerType).add("c3", IntegerType)
t = spark.table(tableName)
assert(t.schema === expectedSchema)
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName DROP COLUMN C3")
}.getMessage
assert(msg.contains("Cannot delete missing field C3 in test.alt_table schema"))
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql(s"ALTER TABLE $tableName DROP COLUMN C3")
expectedSchema = new StructType().add("c1", IntegerType)
t = spark.table(tableName)
assert(t.schema === expectedSchema)
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName ALTER COLUMN C1 TYPE DOUBLE")
}.getMessage
assert(msg.contains("Cannot update missing field C1 in test.alt_table schema"))
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql(s"ALTER TABLE $tableName ALTER COLUMN C1 TYPE DOUBLE")
expectedSchema = new StructType().add("c1", DoubleType)
t = spark.table(tableName)
assert(t.schema === expectedSchema)
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName ALTER COLUMN C1 DROP NOT NULL")
}.getMessage
assert(msg.contains("Cannot update missing field C1 in test.alt_table schema"))
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql(s"ALTER TABLE $tableName ALTER COLUMN C1 DROP NOT NULL")
expectedSchema = new StructType().add("c1", DoubleType, nullable = true)
t = spark.table(tableName)
assert(t.schema === expectedSchema)
}
}
}
}