blob: 60e5cadf8349fa83855835dfc96b979a6ed871ad [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.parser
import scala.collection.mutable
import org.antlr.v4.runtime.tree.TerminalNode
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.parser.ParserUtils.operationNotAllowed
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.{PartitionerField, TableModel, TableNewProcessor}
import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand}
import org.apache.spark.sql.types.StructField
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.SchemaReader
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.util.CarbonScalaUtil
/**
* Utility class to validate the create table and CTAS command,
* and to prepare the logical plan of create table and CTAS command.
*/
object CarbonSparkSqlParserUtil {
/**
* The method validate that the property configured for the streaming attribute is valid.
*
* @param carbonOption Instance of CarbonOption having all the required option for datasource.
*/
private def validateStreamingProperty(carbonOption: CarbonOption): Unit = {
try {
carbonOption.isStreaming
} catch {
case _: IllegalArgumentException =>
throw new MalformedCarbonCommandException(
"Table property 'streaming' should be either 'true' or 'false'")
}
}
/**
* The method validates the create table command and returns the create table or
* ctas table LogicalPlan.
*
* @param createTableTuple a tuple of (CreateTableHeaderContext, SkewSpecContext,
* BucketSpecContext, ColTypeListContext, ColTypeListContext,
* TablePropertyListContext,
* LocationSpecContext, Option[String], TerminalNode, QueryContext,
* String)
* @param extraTableTuple A tupple of (Seq[StructField], Boolean, TableIdentifier, Boolean,
* Seq[String],
* Option[String], mutable.Map[String, String], Map[String, String],
* Seq[StructField],
* Seq[PartitionerField], CarbonSpark2SqlParser, SparkSession,
* Option[LogicalPlan])
* @return <LogicalPlan> of create table or ctas table
*
*/
def createCarbonTable(createTableTuple: (CreateTableHeaderContext, SkewSpecContext,
BucketSpecContext, ColTypeListContext, ColTypeListContext, TablePropertyListContext,
LocationSpecContext, Option[String], TerminalNode, QueryContext, String),
extraTableTuple: (Seq[StructField], Boolean, TableIdentifier, Boolean, Seq[String],
Option[String], mutable.Map[String, String], Map[String, String], Seq[StructField],
Seq[PartitionerField], CarbonSpark2SqlParser, SparkSession,
Option[LogicalPlan])): LogicalPlan = {
val (tableHeader, skewSpecContext, bucketSpecContext, partitionColumns, columns,
tablePropertyList, locationSpecContext, tableComment, ctas, query, provider) = createTableTuple
val (cols, external, tableIdentifier, ifNotExists, colNames, tablePath,
tableProperties, properties, partitionByStructFields, partitionFields,
parser, sparkSession, selectQuery) = extraTableTuple
val options = new CarbonOption(properties)
// validate streaming property
validateStreamingProperty(options)
var fields = parser.getFields(cols ++ partitionByStructFields)
// validate for create table as select
selectQuery match {
case Some(q) =>
// create table as select does not allow creation of partitioned table
if (partitionFields.nonEmpty) {
val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " +
"create a partitioned table using Carbondata file formats."
operationNotAllowed(errorMessage, partitionColumns)
}
// create table as select does not allow to explicitly specify schema
if (fields.nonEmpty) {
operationNotAllowed(
"Schema may not be specified in a Create Table As Select (CTAS) statement", columns)
}
// external table is not allow
if (external) {
operationNotAllowed("Create external table as select", tableHeader)
}
fields = parser
.getFields(CarbonEnv.getInstance(sparkSession).carbonMetaStore
.getSchemaFromUnresolvedRelation(sparkSession, Some(q).get))
case _ =>
// ignore this case
}
val columnNames = fields.map(_.name.get)
checkIfDuplicateColumnExists(columns, tableIdentifier, columnNames)
if (partitionFields.nonEmpty && options.isStreaming) {
operationNotAllowed("Streaming is not allowed on partitioned table", partitionColumns)
}
if (!external && fields.isEmpty) {
throw new MalformedCarbonCommandException("Creating table without column(s) is not supported")
}
if (external && fields.isEmpty && tableProperties.nonEmpty) {
// as fields are always zero for external table, cannot validate table properties.
operationNotAllowed(
"Table properties are not supported for external table", tablePropertyList)
}
// validate tblProperties
val bucketFields = parser.getBucketFields(tableProperties, fields, options)
var isTransactionalTable: Boolean = true
val tableInfo = if (external) {
if (fields.nonEmpty) {
// user provided schema for this external table, this is not allow currently
// see CARBONDATA-2866
operationNotAllowed(
"Schema must not be specified for external table", columns)
}
if (partitionByStructFields.nonEmpty) {
operationNotAllowed(
"Partition is not supported for external table", partitionColumns)
}
// read table info from schema file in the provided table path
// external table also must convert table name to lower case
val identifier = AbsoluteTableIdentifier.from(
tablePath.get,
CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession).toLowerCase(),
tableIdentifier.table.toLowerCase())
val table = try {
val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
if (!FileFactory.isFileExist(schemaPath)) {
if (provider.equalsIgnoreCase("'carbonfile'")) {
SchemaReader.inferSchema(identifier, true)
} else {
isTransactionalTable = false
SchemaReader.inferSchema(identifier, false)
}
} else {
SchemaReader.getTableInfo(identifier)
}
} catch {
case e: Throwable =>
operationNotAllowed(s"Invalid table path provided: ${ tablePath.get } ", tableHeader)
}
// set "_external" property, so that DROP TABLE will not delete the data
if (provider.equalsIgnoreCase("'carbonfile'")) {
table.getFactTable.getTableProperties.put("_filelevelformat", "true")
table.getFactTable.getTableProperties.put("_external", "false")
} else {
table.getFactTable.getTableProperties.put("_external", "true")
table.getFactTable.getTableProperties.put("_filelevelformat", "false")
}
var isLocalDic_enabled = table.getFactTable.getTableProperties
.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
if (null == isLocalDic_enabled) {
table.getFactTable.getTableProperties
.put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.LOCAL_DICTIONARY_SYSTEM_ENABLE,
CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT))
}
isLocalDic_enabled = table.getFactTable.getTableProperties
.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
if (CarbonScalaUtil.validateLocalDictionaryEnable(isLocalDic_enabled) &&
isLocalDic_enabled.toBoolean) {
val allcolumns = table.getFactTable.getListOfColumns
for (i <- 0 until allcolumns.size()) {
val cols = allcolumns.get(i)
if (cols.getDataType == DataTypes.STRING || cols.getDataType == DataTypes.VARCHAR) {
cols.setLocalDictColumn(true)
}
allcolumns.set(i, cols)
}
table.getFactTable.setListOfColumns(allcolumns)
}
table
} else {
// prepare table model of the collected tokens
val tableModel: TableModel = parser.prepareTableModel(
ifNotExists,
convertDbNameToLowerCase(tableIdentifier.database),
tableIdentifier.table.toLowerCase,
fields,
partitionFields,
tableProperties,
bucketFields,
isAlterFlow = false,
tableComment)
TableNewProcessor(tableModel)
}
tableInfo.setTransactionalTable(isTransactionalTable)
selectQuery match {
case query@Some(q) =>
CarbonCreateTableAsSelectCommand(
tableInfo = tableInfo,
query = query.get,
ifNotExistsSet = ifNotExists,
tableLocation = tablePath)
case _ =>
CarbonCreateTableCommand(
tableInfo = tableInfo,
ifNotExistsSet = ifNotExists,
tableLocation = tablePath,
external)
}
}
/**
* This method will convert the database name to lower case
*
* @param dbName database name.
* @return Option of String
*/
def convertDbNameToLowerCase(dbName: Option[String]): Option[String] = {
dbName match {
case Some(databaseName) => Some(databaseName.toLowerCase)
case None => dbName
}
}
/**
* Validates the partition columns and return's A tuple of partition columns and partitioner
* fields.
*
* @param partitionColumns An instance of ColTypeListContext having parser rules for
* column.
* @param colNames <Seq[String]> Sequence of Table column names.
* @param tableProperties <Map[String, String]> Table property map.
* @param partitionByStructFields Seq[StructField] Sequence of partition fields.
* @return <Seq[PartitionerField]> A Seq of partitioner fields.
*/
def validatePartitionFields(
partitionColumns: ColTypeListContext,
colNames: Seq[String],
tableProperties: mutable.Map[String, String],
partitionByStructFields: Seq[StructField]): Seq[PartitionerField] = {
val partitionerFields = partitionByStructFields.map { structField =>
PartitionerField(structField.name, Some(structField.dataType.toString), null)
}
// validate partition clause
if (partitionerFields.nonEmpty) {
// partition columns should not be part of the schema
val badPartCols = partitionerFields.map(_.partitionColumn.toLowerCase).toSet
.intersect(colNames.map(_.toLowerCase).toSet)
if (badPartCols.nonEmpty) {
operationNotAllowed(s"Partition columns should not be specified in the schema: " +
badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]")
, partitionColumns: ColTypeListContext)
}
}
partitionerFields
}
/**
* Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified.
*
* @param ctx Instance of TablePropertyListContext defining parser rule for the table
* properties.
* @param props <Map[String, String]> Map of table property list
* @return <Map[String, String]> Map of transformed table property.
*/
def visitPropertyKeyValues(ctx: TablePropertyListContext,
props: Map[String, String]): Map[String, String] = {
val badKeys = props.filter { case (_, v) => v == null }.keys
if (badKeys.nonEmpty) {
operationNotAllowed(
s"Values must be specified for key(s): ${ badKeys.mkString("[", ",", "]") }", ctx)
}
props.map { case (key, value) =>
(key.toLowerCase, value)
}
}
/**
* The method validate the create table command and returns the table's columns.
*
* @param tableHeader An instance of CreateTableHeaderContext having parser rules for
* create table.
* @param skewSpecContext An instance of SkewSpecContext having parser rules for create table.
* @param bucketSpecContext An instance of BucketSpecContext having parser rules for create table.
* @param columns An instance of ColTypeListContext having parser rules for columns
* of the table.
* @param cols Table;s columns.
* @param tableIdentifier Instance of table identifier.
* @param isTempTable Flag to identify temp table.
* @return Table's column names <Seq[String]>.
*/
def validateCreateTableReqAndGetColumns(tableHeader: CreateTableHeaderContext,
skewSpecContext: SkewSpecContext,
bucketSpecContext: BucketSpecContext,
columns: ColTypeListContext,
cols: Seq[StructField],
tableIdentifier: TableIdentifier,
isTempTable: Boolean): Seq[String] = {
// TODO: implement temporary tables
if (isTempTable) {
throw new ParseException(
"CREATE TEMPORARY TABLE is not supported yet. " +
"Please use CREATE TEMPORARY VIEW as an alternative.", tableHeader)
}
if (skewSpecContext != null) {
operationNotAllowed("CREATE TABLE ... SKEWED BY", skewSpecContext)
}
if (bucketSpecContext != null) {
operationNotAllowed("CREATE TABLE ... CLUSTERED BY", bucketSpecContext)
}
// Ensuring whether no duplicate name is used in table definition
val colNames: Seq[String] = cols.map(_.name)
checkIfDuplicateColumnExists(columns, tableIdentifier, colNames)
colNames
}
private def checkIfDuplicateColumnExists(columns: ColTypeListContext,
tableIdentifier: TableIdentifier,
colNames: Seq[String]): Unit = {
if (colNames.length != colNames.distinct.length) {
val duplicateColumns = colNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => "\"" + x + "\""
}
val errorMessage = s"Duplicated column names found in table definition of " +
s"$tableIdentifier: ${ duplicateColumns.mkString("[", ",", "]") }"
// In case of create table as select, ColTypeListContext will be null. Check if
// duplicateColumns present in column names list, If yes, throw exception
if (null != columns) {
operationNotAllowed(errorMessage, columns)
} else {
throw new UnsupportedOperationException(errorMessage)
}
}
}
/**
* The method return's the storage type
* @param createFileFormat
* @return
*/
def getFileStorage(createFileFormat: CreateFileFormatContext): String = {
Option(createFileFormat) match {
case Some(value) =>
val result = value.children.get(1).getText
if (result.equalsIgnoreCase("by")) {
value.storageHandler().STRING().getSymbol.getText
} else if (result.equalsIgnoreCase("as") && value.children.size() > 1) {
value.children.get(2).getText
} else {
// The case of "STORED AS PARQUET/ORC"
""
}
case _ => ""
}
}
}