blob: e4cc88cec0f5ed9452aa9e40e1f5401cc7e82d67 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.jdbc.v2
import java.util
import java.util.Collections
import scala.jdk.CollectionConverters._
import org.apache.logging.log4j.Level
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException
import org.apache.spark.sql.connector.catalog.{Column, Identifier, NamespaceChange}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.apache.spark.tags.DockerTest
@DockerTest
private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerIntegrationFunSuite {
val catalog = new JDBCTableCatalog()
private val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String]
private val columns: Array[Column] = Array(
Column.create("id", IntegerType),
Column.create("data", StringType))
def builtinNamespaces: Array[Array[String]]
def listNamespaces(namespace: Array[String]): Array[Array[String]] = {
Array(namespace) ++ builtinNamespaces
}
def supportsSchemaComment: Boolean = true
def supportsDropSchemaCascade: Boolean = true
def supportsDropSchemaRestrict: Boolean = true
test("listNamespaces: basic behavior") {
val commentMap = if (supportsSchemaComment) {
Map("comment" -> "test comment")
} else {
Map.empty[String, String]
}
catalog.createNamespace(Array("foo"), commentMap.asJava)
assert(catalog.listNamespaces().map(_.toSet).toSet ===
listNamespaces(Array("foo")).map(_.toSet).toSet)
assert(catalog.listNamespaces(Array("foo")) === Array())
assert(catalog.namespaceExists(Array("foo")) === true)
if (supportsSchemaComment) {
val logAppender = new LogAppender("catalog comment")
withLogAppender(logAppender) {
catalog.alterNamespace(Array("foo"), NamespaceChange
.setProperty("comment", "comment for foo"))
catalog.alterNamespace(Array("foo"), NamespaceChange.removeProperty("comment"))
}
val createCommentWarning = logAppender.loggingEvents
.filter(_.getLevel == Level.WARN)
.map(_.getMessage.getFormattedMessage)
.exists(_.contains("catalog comment"))
assert(createCommentWarning === false)
}
if (supportsDropSchemaRestrict) {
catalog.dropNamespace(Array("foo"), cascade = false)
} else {
catalog.dropNamespace(Array("foo"), cascade = true)
}
assert(catalog.namespaceExists(Array("foo")) === false)
assert(catalog.listNamespaces() === builtinNamespaces)
val e = intercept[AnalysisException] {
catalog.listNamespaces(Array("foo"))
}
checkError(e,
errorClass = "SCHEMA_NOT_FOUND",
parameters = Map("schemaName" -> "`foo`"))
}
test("Drop namespace") {
val ident1 = Identifier.of(Array("foo"), "tab")
// Drop empty namespace without cascade
val commentMap = if (supportsSchemaComment) {
Map("comment" -> "test comment")
} else {
Map.empty[String, String]
}
catalog.createNamespace(Array("foo"), commentMap.asJava)
assert(catalog.namespaceExists(Array("foo")) === true)
if (supportsDropSchemaRestrict) {
catalog.dropNamespace(Array("foo"), cascade = false)
} else {
catalog.dropNamespace(Array("foo"), cascade = true)
}
assert(catalog.namespaceExists(Array("foo")) === false)
// Drop non empty namespace without cascade
catalog.createNamespace(Array("foo"), commentMap.asJava)
assert(catalog.namespaceExists(Array("foo")) === true)
catalog.createTable(ident1, columns, Array.empty[Transform], emptyProps)
if (supportsDropSchemaRestrict) {
intercept[NonEmptyNamespaceException] {
catalog.dropNamespace(Array("foo"), cascade = false)
}
}
// Drop non empty namespace with cascade
if (supportsDropSchemaCascade) {
assert(catalog.namespaceExists(Array("foo")) === true)
catalog.dropNamespace(Array("foo"), cascade = true)
assert(catalog.namespaceExists(Array("foo")) === false)
}
}
}