blob: 610f9bfbe072cb78669c5b4dfb4b235cce74855d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.parser
import scala.collection.mutable
import org.antlr.v4.runtime.tree.TerminalNode
import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession}
import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
import org.apache.spark.sql.catalyst.parser.ParserUtils._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkSqlAstBuilder
import org.apache.spark.sql.execution.command.{PartitionerField, TableModel, TableNewProcessor}
import org.apache.spark.sql.execution.command.table.{CarbonCreateTableAsSelectCommand, CarbonCreateTableCommand}
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.SchemaReader
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
/**
* Concrete parser for Spark SQL stateENABLE_INMEMORY_MERGE_SORT_DEFAULTments and carbon specific
* statements
*/
class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends AbstractSqlParser {
val parser = new CarbonSpark2SqlParser
val astBuilder = CarbonReflectionUtils.getAstBuilder(conf, parser, sparkSession)
private val substitutor = new VariableSubstitution(conf)
override def parsePlan(sqlText: String): LogicalPlan = {
CarbonSession.updateSessionInfoToCurrentThread(sparkSession)
try {
val parsedPlan = super.parsePlan(sqlText)
CarbonScalaUtil.cleanParserThreadLocals
parsedPlan
} catch {
case ce: MalformedCarbonCommandException =>
CarbonScalaUtil.cleanParserThreadLocals
throw ce
case ex =>
try {
parser.parse(sqlText)
} catch {
case mce: MalformedCarbonCommandException =>
throw mce
case e =>
CarbonException.analysisException(
s"""== Parse1 ==
|${ex.getMessage}
|== Parse2 ==
|${e.getMessage}
""".stripMargin.trim)
}
}
}
protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
super.parse(substitutor.substitute(command))(toResult)
}
}
class CarbonHelperSqlAstBuilder(conf: SQLConf,
parser: CarbonSpark2SqlParser,
sparkSession: SparkSession)
extends SparkSqlAstBuilder(conf) {
def getFileStorage(createFileFormat: CreateFileFormatContext): String = {
Option(createFileFormat) match {
case Some(value) =>
if (value.children.get(1).getText.equalsIgnoreCase("by")) {
value.storageHandler().STRING().getSymbol.getText
} else {
// The case of "STORED AS PARQUET/ORC"
""
}
case _ => ""
}
}
/**
* This method will convert the database name to lower case
*
* @param dbName
* @return Option of String
*/
def convertDbNameToLowerCase(dbName: Option[String]): Option[String] = {
dbName match {
case Some(databaseName) => Some(databaseName.toLowerCase)
case None => dbName
}
}
def needToConvertToLowerCase(key: String): Boolean = {
val noConvertList = Array("LIST_INFO", "RANGE_INFO")
!noConvertList.exists(x => x.equalsIgnoreCase(key));
}
/**
* Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified.
*/
def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = {
val props = visitTablePropertyList(ctx)
val badKeys = props.filter { case (_, v) => v == null }.keys
if (badKeys.nonEmpty) {
operationNotAllowed(
s"Values must be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx)
}
props.map { case (key, value) =>
if (needToConvertToLowerCase(key)) {
(key.toLowerCase, value.toLowerCase)
} else {
(key.toLowerCase, value)
}
}
}
def getPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String]
= {
Option(ctx).map(visitPropertyKeyValues)
.getOrElse(Map.empty)
}
def createCarbonTable(createTableTuple: (CreateTableHeaderContext, SkewSpecContext,
BucketSpecContext, ColTypeListContext, ColTypeListContext, TablePropertyListContext,
LocationSpecContext, Option[String], TerminalNode, QueryContext, String)): LogicalPlan = {
// val parser = new CarbonSpark2SqlParser
val (tableHeader, skewSpecContext,
bucketSpecContext,
partitionColumns,
columns,
tablePropertyList,
locationSpecContext,
tableComment,
ctas,
query,
provider) = createTableTuple
val (tableIdentifier, temp, ifNotExists, external) = visitCreateTableHeader(tableHeader)
// TODO: implement temporary tables
if (temp) {
throw new ParseException(
"CREATE TEMPORARY TABLE is not supported yet. " +
"Please use CREATE TEMPORARY VIEW as an alternative.", tableHeader)
}
if (skewSpecContext != null) {
operationNotAllowed("CREATE TABLE ... SKEWED BY", skewSpecContext)
}
if (bucketSpecContext != null) {
operationNotAllowed("CREATE TABLE ... CLUSTERED BY", bucketSpecContext)
}
val cols = Option(columns).toSeq.flatMap(visitColTypeList)
val properties = getPropertyKeyValues(tablePropertyList)
// Ensuring whether no duplicate name is used in table definition
val colNames = cols.map(_.name)
if (colNames.length != colNames.distinct.length) {
val duplicateColumns = colNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => "\"" + x + "\""
}
operationNotAllowed(s"Duplicated column names found in table definition of " +
s"$tableIdentifier: ${duplicateColumns.mkString("[", ",", "]")}", columns)
}
val tablePath = if (locationSpecContext != null) {
Some(visitLocationSpec(locationSpecContext))
} else {
None
}
val tableProperties = mutable.Map[String, String]()
properties.foreach{property => tableProperties.put(property._1, property._2)}
// validate partition clause
val (partitionByStructFields, partitionFields) =
validateParitionFields(partitionColumns, colNames, tableProperties)
// validate partition clause
if (partitionFields.nonEmpty) {
if (!CommonUtil.validatePartitionColumns(tableProperties, partitionFields)) {
throw new MalformedCarbonCommandException("Error: Invalid partition definition")
}
// partition columns should not be part of the schema
val badPartCols = partitionFields
.map(_.partitionColumn.toLowerCase)
.toSet
.intersect(colNames.map(_.toLowerCase).toSet)
if (badPartCols.nonEmpty) {
operationNotAllowed(s"Partition columns should not be specified in the schema: " +
badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"),
partitionColumns)
}
}
val options = new CarbonOption(properties)
// validate streaming property
validateStreamingProperty(options)
var fields = parser.getFields(cols ++ partitionByStructFields)
// validate for create table as select
val selectQuery = Option(query).map(plan)
selectQuery match {
case Some(q) =>
// create table as select does not allow creation of partitioned table
if (partitionFields.nonEmpty) {
val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " +
"create a partitioned table using Carbondata file formats."
operationNotAllowed(errorMessage, partitionColumns)
}
// create table as select does not allow to explicitly specify schema
if (fields.nonEmpty) {
operationNotAllowed(
"Schema may not be specified in a Create Table As Select (CTAS) statement", columns)
}
// external table is not allow
if (external) {
operationNotAllowed("Create external table as select", tableHeader)
}
fields = parser
.getFields(CarbonEnv.getInstance(sparkSession).carbonMetastore
.getSchemaFromUnresolvedRelation(sparkSession, Some(q).get))
case _ =>
// ignore this case
}
if (partitionFields.nonEmpty && options.isStreaming) {
operationNotAllowed("Streaming is not allowed on partitioned table", partitionColumns)
}
// validate tblProperties
val bucketFields = parser.getBucketFields(tableProperties, fields, options)
val tableInfo = if (external) {
// read table info from schema file in the provided table path
val identifier = AbsoluteTableIdentifier.from(
tablePath.get,
CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession),
tableIdentifier.table)
val table = try {
val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath)) &&
provider.equalsIgnoreCase("'carbonfile'")) {
SchemaReader.inferSchema(identifier)
}
else {
SchemaReader.getTableInfo(identifier)
}
}
catch {
case e: Throwable =>
operationNotAllowed(s"Invalid table path provided: ${tablePath.get} ", tableHeader)
}
// set "_external" property, so that DROP TABLE will not delete the data
if (provider.equalsIgnoreCase("'carbonfile'")) {
table.getFactTable.getTableProperties.put("_filelevelformat", "true")
table.getFactTable.getTableProperties.put("_external", "false")
} else {
table.getFactTable.getTableProperties.put("_external", "true")
table.getFactTable.getTableProperties.put("_filelevelformat", "false")
}
table
} else {
// prepare table model of the collected tokens
val tableModel: TableModel = parser.prepareTableModel(
ifNotExists,
convertDbNameToLowerCase(tableIdentifier.database),
tableIdentifier.table.toLowerCase,
fields,
partitionFields,
tableProperties,
bucketFields,
isAlterFlow = false,
tableComment)
TableNewProcessor(tableModel)
}
selectQuery match {
case query@Some(q) =>
CarbonCreateTableAsSelectCommand(
tableInfo = tableInfo,
query = query.get,
ifNotExistsSet = ifNotExists,
tableLocation = tablePath)
case _ =>
CarbonCreateTableCommand(
tableInfo = tableInfo,
ifNotExistsSet = ifNotExists,
tableLocation = tablePath)
}
}
private def validateStreamingProperty(carbonOption: CarbonOption): Unit = {
try {
carbonOption.isStreaming
} catch {
case _: IllegalArgumentException =>
throw new MalformedCarbonCommandException(
"Table property 'streaming' should be either 'true' or 'false'")
}
}
private def validateParitionFields(
partitionColumns: ColTypeListContext,
colNames: Seq[String],
tableProperties: mutable.Map[String, String]): (Seq[StructField], Seq[PartitionerField]) = {
val partitionByStructFields = Option(partitionColumns).toSeq.flatMap(visitColTypeList)
val partitionerFields = partitionByStructFields.map { structField =>
PartitionerField(structField.name, Some(structField.dataType.toString), null)
}
if (partitionerFields.nonEmpty) {
if (!CommonUtil.validatePartitionColumns(tableProperties, partitionerFields)) {
throw new MalformedCarbonCommandException("Error: Invalid partition definition")
}
// partition columns should not be part of the schema
val badPartCols = partitionerFields.map(_.partitionColumn).toSet.intersect(colNames.toSet)
if (badPartCols.nonEmpty) {
operationNotAllowed(s"Partition columns should not be specified in the schema: " +
badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]")
, partitionColumns: ColTypeListContext)
}
}
(partitionByStructFields, partitionerFields)
}
}
trait CarbonAstTrait {
def getFileStorage (createFileFormat : CreateFileFormatContext): String
}