blob: f35eb41fe2ce1d5351fdd3e2e0d874251ab5259b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType}
/**
* Resolves catalogs from the multi-part identifiers in SQL statements, and convert the statements
* to the corresponding v1 or v2 commands if the resolved catalog is the session catalog.
*
* We can remove this rule once we implement all the catalog functionality in `V2SessionCatalog`.
*/
class ResolveSessionCatalog(
val catalogManager: CatalogManager,
isTempView: Seq[String] => Boolean,
isTempFunction: String => Boolean)
extends Rule[LogicalPlan] with LookupCatalog {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.CatalogV2Util._
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case AlterTableAddColumnsStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failNullType(c.dataType))
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
if (!DDLUtils.isHiveTable(v1Table.v1Table)) {
cols.foreach(c => failCharType(c.dataType))
}
cols.foreach { c =>
assertTopLevelColumn(c.name, "AlterTableAddColumnsCommand")
if (!c.nullable) {
throw new AnalysisException(
"ADD COLUMN with v1 tables cannot specify NOT NULL.")
}
}
AlterTableAddColumnsCommand(tbl.asTableIdentifier, cols.map(convertToStructField))
}.getOrElse {
cols.foreach(c => failCharType(c.dataType))
val changes = cols.map { col =>
TableChange.addColumn(
col.name.toArray,
col.dataType,
col.nullable,
col.comment.orNull,
col.position.orNull)
}
createAlterTable(nameParts, catalog, tbl, changes)
}
case AlterTableReplaceColumnsStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failNullType(c.dataType))
val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
case Some(_: V1Table) =>
throw new AnalysisException("REPLACE COLUMNS is only supported with v2 tables.")
case Some(table) =>
cols.foreach(c => failCharType(c.dataType))
// REPLACE COLUMNS deletes all the existing columns and adds new columns specified.
val deleteChanges = table.schema.fieldNames.map { name =>
TableChange.deleteColumn(Array(name))
}
val addChanges = cols.map { col =>
TableChange.addColumn(
col.name.toArray,
col.dataType,
col.nullable,
col.comment.orNull,
col.position.orNull)
}
deleteChanges ++ addChanges
case None => Seq() // Unresolved table will be handled in CheckAnalysis.
}
createAlterTable(nameParts, catalog, tbl, changes)
case a @ AlterTableAlterColumnStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
a.dataType.foreach(failNullType)
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
if (!DDLUtils.isHiveTable(v1Table.v1Table)) {
a.dataType.foreach(failCharType)
}
if (a.column.length > 1) {
throw new AnalysisException(
"ALTER COLUMN with qualified column is only supported with v2 tables.")
}
if (a.nullable.isDefined) {
throw new AnalysisException(
"ALTER COLUMN with v1 tables cannot specify NOT NULL.")
}
if (a.position.isDefined) {
throw new AnalysisException("" +
"ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables.")
}
val builder = new MetadataBuilder
// Add comment to metadata
a.comment.map(c => builder.putString("comment", c))
val colName = a.column(0)
val dataType = a.dataType.getOrElse {
v1Table.schema.findNestedField(Seq(colName), resolver = conf.resolver)
.map(_._2.dataType)
.getOrElse {
throw new AnalysisException(
s"ALTER COLUMN cannot find column ${quoteIfNeeded(colName)} in v1 table. " +
s"Available: ${v1Table.schema.fieldNames.mkString(", ")}")
}
}
// Add Hive type string to metadata.
val cleanedDataType = HiveStringType.replaceCharType(dataType)
if (dataType != cleanedDataType) {
builder.putString(HIVE_TYPE_STRING, dataType.catalogString)
}
val newColumn = StructField(
colName,
cleanedDataType,
nullable = true,
builder.build())
AlterTableChangeColumnCommand(tbl.asTableIdentifier, colName, newColumn)
}.getOrElse {
a.dataType.foreach(failCharType)
val colName = a.column.toArray
val typeChange = a.dataType.map { newDataType =>
TableChange.updateColumnType(colName, newDataType)
}
val nullabilityChange = a.nullable.map { nullable =>
TableChange.updateColumnNullability(colName, nullable)
}
val commentChange = a.comment.map { newComment =>
TableChange.updateColumnComment(colName, newComment)
}
val positionChange = a.position.map { newPosition =>
TableChange.updateColumnPosition(colName, newPosition)
}
createAlterTable(
nameParts,
catalog,
tbl,
typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange)
}
case AlterTableRenameColumnStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), col, newName) =>
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
throw new AnalysisException("RENAME COLUMN is only supported with v2 tables.")
}.getOrElse {
val changes = Seq(TableChange.renameColumn(col.toArray, newName))
createAlterTable(nameParts, catalog, tbl, changes)
}
case AlterTableDropColumnsStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
throw new AnalysisException("DROP COLUMN is only supported with v2 tables.")
}.getOrElse {
val changes = cols.map(col => TableChange.deleteColumn(col.toArray))
createAlterTable(nameParts, catalog, tbl, changes)
}
case AlterTableSetPropertiesStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), props) =>
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
AlterTableSetPropertiesCommand(tbl.asTableIdentifier, props, isView = false)
}.getOrElse {
val changes = props.map { case (key, value) =>
TableChange.setProperty(key, value)
}.toSeq
createAlterTable(nameParts, catalog, tbl, changes)
}
case AlterTableUnsetPropertiesStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), keys, ifExists) =>
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
AlterTableUnsetPropertiesCommand(
tbl.asTableIdentifier, keys, ifExists, isView = false)
}.getOrElse {
val changes = keys.map(key => TableChange.removeProperty(key))
createAlterTable(nameParts, catalog, tbl, changes)
}
case AlterTableSetLocationStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), partitionSpec, newLoc) =>
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
AlterTableSetLocationCommand(tbl.asTableIdentifier, partitionSpec, newLoc)
}.getOrElse {
if (partitionSpec.nonEmpty) {
throw new AnalysisException(
"ALTER TABLE SET LOCATION does not support partition for v2 tables.")
}
val changes = Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, newLoc))
createAlterTable(nameParts, catalog, tbl, changes)
}
// ALTER VIEW should always use v1 command if the resolved catalog is session catalog.
case AlterViewSetPropertiesStatement(SessionCatalogAndTable(_, tbl), props) =>
AlterTableSetPropertiesCommand(tbl.asTableIdentifier, props, isView = true)
case AlterViewUnsetPropertiesStatement(SessionCatalogAndTable(_, tbl), keys, ifExists) =>
AlterTableUnsetPropertiesCommand(tbl.asTableIdentifier, keys, ifExists, isView = true)
case d @ DescribeNamespace(SessionCatalogAndNamespace(_, ns), _) =>
if (ns.length != 1) {
throw new AnalysisException(
s"The database name is not valid: ${ns.quoted}")
}
DescribeDatabaseCommand(ns.head, d.extended)
case AlterNamespaceSetProperties(SessionCatalogAndNamespace(_, ns), properties) =>
if (ns.length != 1) {
throw new AnalysisException(
s"The database name is not valid: ${ns.quoted}")
}
AlterDatabasePropertiesCommand(ns.head, properties)
case AlterNamespaceSetLocation(SessionCatalogAndNamespace(_, ns), location) =>
if (ns.length != 1) {
throw new AnalysisException(
s"The database name is not valid: ${ns.quoted}")
}
AlterDatabaseSetLocationCommand(ns.head, location)
// v1 RENAME TABLE supports temp view.
case RenameTableStatement(TempViewOrV1Table(oldName), newName, isView) =>
AlterTableRenameCommand(oldName.asTableIdentifier, newName.asTableIdentifier, isView)
case DescribeRelation(r @ ResolvedTable(_, ident, _: V1Table), partitionSpec, isExtended)
if isSessionCatalog(r.catalog) =>
DescribeTableCommand(ident.asTableIdentifier, partitionSpec, isExtended)
// Use v1 command to describe (temp) view, as v2 catalog doesn't support view yet.
case DescribeRelation(ResolvedView(ident, _), partitionSpec, isExtended) =>
DescribeTableCommand(ident.asTableIdentifier, partitionSpec, isExtended)
case DescribeColumn(r @ ResolvedTable(_, _, _: V1Table), colNameParts, isExtended)
if isSessionCatalog(r.catalog) =>
DescribeColumnCommand(r.identifier.asTableIdentifier, colNameParts, isExtended)
case DescribeColumn(ResolvedView(ident, _), colNameParts, isExtended) =>
DescribeColumnCommand(ident.asTableIdentifier, colNameParts, isExtended)
// For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the
// session catalog and the table provider is not v2.
case c @ CreateTableStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
if (!DDLUtils.isHiveTable(Some(provider))) {
assertNoCharTypeInSchema(c.tableSchema)
}
val tableDesc = buildCatalogTable(tbl.asTableIdentifier, c.tableSchema,
c.partitioning, c.bucketSpec, c.properties, provider, c.options, c.location,
c.comment, c.ifNotExists)
val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTable(tableDesc, mode, None)
} else {
assertNoCharTypeInSchema(c.tableSchema)
CreateV2Table(
catalog.asTableCatalog,
tbl.asIdentifier,
c.tableSchema,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
convertTableProperties(c.properties, c.options, c.location, c.comment, Some(provider)),
ignoreIfExists = c.ifNotExists)
}
case c @ CreateTableAsSelectStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
if (c.asSelect.resolved) {
assertNoNullTypeInSchema(c.asSelect.schema)
}
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType,
c.partitioning, c.bucketSpec, c.properties, provider, c.options, c.location,
c.comment, c.ifNotExists)
val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTable(tableDesc, mode, Some(c.asSelect))
} else {
CreateTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
convertTableProperties(c.properties, c.options, c.location, c.comment, Some(provider)),
writeOptions = c.writeOptions,
ignoreIfExists = c.ifNotExists)
}
case RefreshTable(r @ ResolvedTable(_, _, _: V1Table)) if isSessionCatalog(r.catalog) =>
RefreshTableCommand(r.identifier.asTableIdentifier)
case RefreshTable(r: ResolvedView) =>
RefreshTableCommand(r.identifier.asTableIdentifier)
// For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the
// session catalog and the table provider is not v2.
case c @ ReplaceTableStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
throw new AnalysisException("REPLACE TABLE is only supported with v2 tables.")
} else {
assertNoCharTypeInSchema(c.tableSchema)
ReplaceTable(
catalog.asTableCatalog,
tbl.asIdentifier,
c.tableSchema,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
convertTableProperties(c.properties, c.options, c.location, c.comment, Some(provider)),
orCreate = c.orCreate)
}
case c @ ReplaceTableAsSelectStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
if (c.asSelect.resolved) {
assertNoNullTypeInSchema(c.asSelect.schema)
}
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
throw new AnalysisException("REPLACE TABLE AS SELECT is only supported with v2 tables.")
} else {
ReplaceTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
convertTableProperties(c.properties, c.options, c.location, c.comment, Some(provider)),
writeOptions = c.writeOptions,
orCreate = c.orCreate)
}
case DropTable(
r @ ResolvedTable(_, _, _: V1Table), ifExists, purge) if isSessionCatalog(r.catalog) =>
DropTableCommand(r.identifier.asTableIdentifier, ifExists, isView = false, purge = purge)
// v1 DROP TABLE supports temp view.
case DropTable(r: ResolvedView, ifExists, purge) =>
if (!r.isTemp) {
throw new AnalysisException(
"Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
}
DropTableCommand(r.identifier.asTableIdentifier, ifExists, isView = false, purge = purge)
// v1 DROP TABLE supports temp view.
case DropViewStatement(TempViewOrV1Table(name), ifExists) =>
DropTableCommand(name.asTableIdentifier, ifExists, isView = true, purge = false)
case c @ CreateNamespaceStatement(CatalogAndNamespace(catalog, ns), _, _)
if isSessionCatalog(catalog) =>
if (ns.length != 1) {
throw new AnalysisException(
s"The database name is not valid: ${ns.quoted}")
}
val comment = c.properties.get(SupportsNamespaces.PROP_COMMENT)
val location = c.properties.get(SupportsNamespaces.PROP_LOCATION)
val newProperties = c.properties -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES
CreateDatabaseCommand(ns.head, c.ifNotExists, location, comment, newProperties)
case d @ DropNamespace(SessionCatalogAndNamespace(_, ns), _, _) =>
if (ns.length != 1) {
throw new AnalysisException(
s"The database name is not valid: ${ns.quoted}")
}
DropDatabaseCommand(ns.head, d.ifExists, d.cascade)
case ShowTables(SessionCatalogAndNamespace(_, ns), pattern) =>
assert(ns.nonEmpty)
if (ns.length != 1) {
throw new AnalysisException(
s"The database name is not valid: ${ns.quoted}")
}
ShowTablesCommand(Some(ns.head), pattern)
case ShowTableStatement(ns, pattern, partitionsSpec) =>
val db = ns match {
case Some(ns) if ns.length != 1 =>
throw new AnalysisException(
s"The database name is not valid: ${ns.quoted}")
case _ => ns.map(_.head)
}
ShowTablesCommand(db, Some(pattern), true, partitionsSpec)
case AnalyzeTableStatement(tbl, partitionSpec, noScan) =>
val v1TableName = parseV1Table(tbl, "ANALYZE TABLE")
if (partitionSpec.isEmpty) {
AnalyzeTableCommand(v1TableName.asTableIdentifier, noScan)
} else {
AnalyzePartitionCommand(v1TableName.asTableIdentifier, partitionSpec, noScan)
}
case AnalyzeColumnStatement(tbl, columnNames, allColumns) =>
val v1TableName = parseTempViewOrV1Table(tbl, "ANALYZE TABLE")
AnalyzeColumnCommand(v1TableName.asTableIdentifier, columnNames, allColumns)
case RepairTableStatement(tbl) =>
val v1TableName = parseV1Table(tbl, "MSCK REPAIR TABLE")
AlterTableRecoverPartitionsCommand(
v1TableName.asTableIdentifier,
"MSCK REPAIR TABLE")
case LoadDataStatement(tbl, path, isLocal, isOverwrite, partition) =>
val v1TableName = parseV1Table(tbl, "LOAD DATA")
LoadDataCommand(
v1TableName.asTableIdentifier,
path,
isLocal,
isOverwrite,
partition)
case ShowCreateTableStatement(tbl, asSerde) if !asSerde =>
val name = parseTempViewOrV1Table(tbl, "SHOW CREATE TABLE")
ShowCreateTableCommand(name.asTableIdentifier)
case ShowCreateTableStatement(tbl, asSerde) if asSerde =>
val v1TableName = parseV1Table(tbl, "SHOW CREATE TABLE AS SERDE")
ShowCreateTableAsSerdeCommand(v1TableName.asTableIdentifier)
case CacheTableStatement(tbl, plan, isLazy, options) =>
val name = if (plan.isDefined) {
// CACHE TABLE ... AS SELECT creates a temp view with the input query.
// Temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name.
tbl
} else {
parseTempViewOrV1Table(tbl, "CACHE TABLE")
}
CacheTableCommand(name.asTableIdentifier, plan, isLazy, options)
case UncacheTableStatement(tbl, ifExists) =>
val name = parseTempViewOrV1Table(tbl, "UNCACHE TABLE")
UncacheTableCommand(name.asTableIdentifier, ifExists)
case TruncateTableStatement(tbl, partitionSpec) =>
val v1TableName = parseV1Table(tbl, "TRUNCATE TABLE")
TruncateTableCommand(
v1TableName.asTableIdentifier,
partitionSpec)
case ShowPartitionsStatement(tbl, partitionSpec) =>
val v1TableName = parseV1Table(tbl, "SHOW PARTITIONS")
ShowPartitionsCommand(
v1TableName.asTableIdentifier,
partitionSpec)
case ShowColumnsStatement(tbl, ns) =>
if (ns.isDefined && ns.get.length > 1) {
throw new AnalysisException(
s"Namespace name should have only one part if specified: ${ns.get.quoted}")
}
// Use namespace only if table name doesn't specify it. If namespace is already specified
// in the table name, it's checked against the given namespace below.
val nameParts = if (ns.isDefined && tbl.length == 1) {
ns.get ++ tbl
} else {
tbl
}
val sql = "SHOW COLUMNS"
val v1TableName = parseTempViewOrV1Table(nameParts, sql).asTableIdentifier
val resolver = conf.resolver
val db = ns match {
case Some(db) if v1TableName.database.exists(!resolver(_, db.head)) =>
throw new AnalysisException(
s"SHOW COLUMNS with conflicting databases: " +
s"'${db.head}' != '${v1TableName.database.get}'")
case _ => ns.map(_.head)
}
ShowColumnsCommand(db, v1TableName)
case AlterTableRecoverPartitionsStatement(tbl) =>
val v1TableName = parseV1Table(tbl, "ALTER TABLE RECOVER PARTITIONS")
AlterTableRecoverPartitionsCommand(
v1TableName.asTableIdentifier,
"ALTER TABLE RECOVER PARTITIONS")
case AlterTableAddPartitionStatement(tbl, partitionSpecsAndLocs, ifNotExists) =>
val v1TableName = parseV1Table(tbl, "ALTER TABLE ADD PARTITION")
AlterTableAddPartitionCommand(
v1TableName.asTableIdentifier,
partitionSpecsAndLocs,
ifNotExists)
case AlterTableRenamePartitionStatement(tbl, from, to) =>
val v1TableName = parseV1Table(tbl, "ALTER TABLE RENAME PARTITION")
AlterTableRenamePartitionCommand(
v1TableName.asTableIdentifier,
from,
to)
case AlterTableDropPartitionStatement(tbl, specs, ifExists, purge, retainData) =>
val v1TableName = parseV1Table(tbl, "ALTER TABLE DROP PARTITION")
AlterTableDropPartitionCommand(
v1TableName.asTableIdentifier,
specs,
ifExists,
purge,
retainData)
case AlterTableSerDePropertiesStatement(tbl, serdeClassName, serdeProperties, partitionSpec) =>
val v1TableName = parseV1Table(tbl, "ALTER TABLE SerDe Properties")
AlterTableSerDePropertiesCommand(
v1TableName.asTableIdentifier,
serdeClassName,
serdeProperties,
partitionSpec)
case AlterViewAsStatement(name, originalText, query) =>
val viewName = parseTempViewOrV1Table(name, "ALTER VIEW QUERY")
AlterViewAsCommand(
viewName.asTableIdentifier,
originalText,
query)
case CreateViewStatement(
tbl, userSpecifiedColumns, comment, properties,
originalText, child, allowExisting, replace, viewType) =>
val v1TableName = if (viewType != PersistedView) {
// temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name.
tbl
} else {
parseV1Table(tbl, "CREATE VIEW")
}
CreateViewCommand(
v1TableName.asTableIdentifier,
userSpecifiedColumns,
comment,
properties,
originalText,
child,
allowExisting,
replace,
viewType)
case ShowViews(resolved: ResolvedNamespace, pattern) =>
resolved match {
case SessionCatalogAndNamespace(_, ns) =>
// Fallback to v1 ShowViewsCommand since there is no view API in v2 catalog
assert(ns.nonEmpty)
if (ns.length != 1) {
throw new AnalysisException(s"The database name is not valid: ${ns.quoted}")
}
ShowViewsCommand(ns.head, pattern)
case _ =>
throw new AnalysisException(s"Catalog ${resolved.catalog.name} doesn't support " +
"SHOW VIEWS, only SessionCatalog supports this command.")
}
case ShowTableProperties(
r @ ResolvedTable(_, _, _: V1Table), propertyKey) if isSessionCatalog(r.catalog) =>
ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey)
case ShowTableProperties(r: ResolvedView, propertyKey) =>
ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey)
case DescribeFunction(ResolvedFunc(identifier), extended) =>
DescribeFunctionCommand(identifier.asFunctionIdentifier, extended)
case ShowFunctions(None, userScope, systemScope, pattern) =>
ShowFunctionsCommand(None, pattern, userScope, systemScope)
case ShowFunctions(Some(ResolvedFunc(identifier)), userScope, systemScope, _) =>
val funcIdentifier = identifier.asFunctionIdentifier
ShowFunctionsCommand(
funcIdentifier.database, Some(funcIdentifier.funcName), userScope, systemScope)
case DropFunction(ResolvedFunc(identifier), ifExists, isTemp) =>
val funcIdentifier = identifier.asFunctionIdentifier
DropFunctionCommand(funcIdentifier.database, funcIdentifier.funcName, ifExists, isTemp)
case CreateFunctionStatement(nameParts,
className, resources, isTemp, ignoreIfExists, replace) =>
if (isTemp) {
// temp func doesn't belong to any catalog and we shouldn't resolve catalog in the name.
val database = if (nameParts.length > 2) {
throw new AnalysisException(s"Unsupported function name '${nameParts.quoted}'")
} else if (nameParts.length == 2) {
Some(nameParts.head)
} else {
None
}
CreateFunctionCommand(
database,
nameParts.last,
className,
resources,
isTemp,
ignoreIfExists,
replace)
} else {
val FunctionIdentifier(function, database) =
parseSessionCatalogFunctionIdentifier(nameParts)
CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists,
replace)
}
case RefreshFunction(ResolvedFunc(identifier)) =>
// Fallback to v1 command
val funcIdentifier = identifier.asFunctionIdentifier
RefreshFunctionCommand(funcIdentifier.database, funcIdentifier.funcName)
}
private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = tableName match {
case SessionCatalogAndTable(_, tbl) => tbl
case _ => throw new AnalysisException(s"$sql is only supported with v1 tables.")
}
private def parseTempViewOrV1Table(
nameParts: Seq[String], sql: String): Seq[String] = nameParts match {
case TempViewOrV1Table(name) => name
case _ => throw new AnalysisException(s"$sql is only supported with temp views or v1 tables.")
}
private def buildCatalogTable(
table: TableIdentifier,
schema: StructType,
partitioning: Seq[Transform],
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
provider: String,
options: Map[String, String],
location: Option[String],
comment: Option[String],
ifNotExists: Boolean): CatalogTable = {
val storage = CatalogStorageFormat.empty.copy(
locationUri = location.map(CatalogUtils.stringToURI),
properties = options)
val tableType = if (location.isDefined) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
}
CatalogTable(
identifier = table,
tableType = tableType,
storage = storage,
schema = schema,
provider = Some(provider),
partitionColumnNames = partitioning.asPartitionColumns,
bucketSpec = bucketSpec,
properties = properties,
comment = comment)
}
object SessionCatalogAndTable {
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match {
case SessionCatalogAndIdentifier(catalog, ident) =>
Some(catalog -> ident.asMultipartIdentifier)
case _ => None
}
}
object TempViewOrV1Table {
def unapply(nameParts: Seq[String]): Option[Seq[String]] = nameParts match {
case _ if isTempView(nameParts) => Some(nameParts)
case SessionCatalogAndIdentifier(_, tbl) => Some(tbl.asMultipartIdentifier)
case _ => None
}
}
object SessionCatalogAndNamespace {
def unapply(resolved: ResolvedNamespace): Option[(CatalogPlugin, Seq[String])] =
if (isSessionCatalog(resolved.catalog)) {
Some(resolved.catalog -> resolved.namespace)
} else {
None
}
}
private def assertTopLevelColumn(colName: Seq[String], command: String): Unit = {
if (colName.length > 1) {
throw new AnalysisException(s"$command does not support nested column: ${colName.quoted}")
}
}
private def convertToStructField(col: QualifiedColType): StructField = {
val builder = new MetadataBuilder
col.comment.foreach(builder.putString("comment", _))
val cleanedDataType = HiveStringType.replaceCharType(col.dataType)
if (col.dataType != cleanedDataType) {
builder.putString(HIVE_TYPE_STRING, col.dataType.catalogString)
}
StructField(
col.name.head,
cleanedDataType,
nullable = true,
builder.build())
}
private def isV2Provider(provider: String): Boolean = {
DataSource.lookupDataSourceV2(provider, conf) match {
// TODO(SPARK-28396): Currently file source v2 can't work with tables.
case Some(_: FileDataSourceV2) => false
case Some(_) => true
case _ => false
}
}
}