blob: c20189efc91fb754a53a57347b2bb917c8c6c69d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsMetadataColumns, Table}
case class DescribeTableExec(
output: Seq[Attribute],
table: Table,
isExtended: Boolean) extends LeafV2CommandExec {
override protected def run(): Seq[InternalRow] = {
val rows = new ArrayBuffer[InternalRow]()
addSchema(rows)
addPartitioning(rows)
if (isExtended) {
addMetadataColumns(rows)
addTableDetails(rows)
}
rows.toSeq
}
private def addTableDetails(rows: ArrayBuffer[InternalRow]): Unit = {
rows += emptyRow()
rows += toCatalystRow("# Detailed Table Information", "", "")
rows += toCatalystRow("Name", table.name(), "")
CatalogV2Util.TABLE_RESERVED_PROPERTIES.foreach(propKey => {
if (table.properties.containsKey(propKey)) {
rows += toCatalystRow(propKey.capitalize, table.properties.get(propKey), "")
}
})
val properties =
table.properties.asScala.toList
.filter(kv => !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(kv._1))
.sortBy(_._1).map {
case (key, value) => key + "=" + value
}.mkString("[", ",", "]")
rows += toCatalystRow("Table Properties", properties, "")
}
private def addSchema(rows: ArrayBuffer[InternalRow]): Unit = {
rows ++= table.schema.map{ column =>
toCatalystRow(
column.name, column.dataType.simpleString, column.getComment().getOrElse(""))
}
}
private def addMetadataColumns(rows: ArrayBuffer[InternalRow]): Unit = table match {
case hasMeta: SupportsMetadataColumns if hasMeta.metadataColumns.nonEmpty =>
rows += emptyRow()
rows += toCatalystRow("# Metadata Columns", "", "")
rows ++= hasMeta.metadataColumns.map { column =>
toCatalystRow(
column.name,
column.dataType.simpleString,
Option(column.comment()).getOrElse(""))
}
case _ =>
}
private def addPartitioning(rows: ArrayBuffer[InternalRow]): Unit = {
rows += emptyRow()
rows += toCatalystRow("# Partitioning", "", "")
if (table.partitioning.isEmpty) {
rows += toCatalystRow("Not partitioned", "", "")
} else {
rows ++= table.partitioning.zipWithIndex.map {
case (transform, index) => toCatalystRow(s"Part $index", transform.describe(), "")
}
}
}
private def emptyRow(): InternalRow = toCatalystRow("", "", "")
}