blob: 7797b42efaab0371528278bd779ecb3c05f61499 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.parser
import scala.collection.mutable
import scala.language.implicitConversions
import org.apache.spark.sql.{CarbonToSparkAdapter, DeleteRecords, UpdateTable}
import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier}
import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.datamap.{CarbonCreateDataMapCommand, CarbonDataMapRebuildCommand, CarbonDataMapShowCommand, CarbonDropDataMapCommand}
import org.apache.spark.sql.execution.command.management._
import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableColRenameDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.execution.command.cache.{CarbonDropCacheCommand, CarbonShowCacheCommand}
import org.apache.spark.sql.execution.command.stream.{CarbonCreateStreamCommand, CarbonDropStreamCommand, CarbonShowStreamsCommand}
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
/**
* TODO remove the duplicate code and add the common methods to common class.
* Parser for All Carbon DDL, DML cases in Unified context
*/
class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
override def parse(input: String): LogicalPlan = {
synchronized {
// Initialize the Keywords.
initLexical
phrase(start)(new lexical.Scanner(input)) match {
case Success(plan, _) =>
CarbonScalaUtil.cleanParserThreadLocals()
plan match {
case x: CarbonLoadDataCommand =>
x.inputSqlString = input
x
case x: CarbonAlterTableCompactionCommand =>
x.alterTableModel.alterSql = input
x
case logicalPlan => logicalPlan
}
case failureOrError =>
CarbonScalaUtil.cleanParserThreadLocals()
CarbonException.analysisException(failureOrError.toString)
}
}
}
protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand
protected lazy val startCommand: Parser[LogicalPlan] =
loadManagement | showLoads | alterTable | restructure | updateTable | deleteRecords |
datamapManagement | alterTableFinishStreaming | stream | cli |
cacheManagement | alterDataMap | insertStageData
protected lazy val loadManagement: Parser[LogicalPlan] =
deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew | addLoad
protected lazy val restructure: Parser[LogicalPlan] =
alterTableColumnRenameAndModifyDataType | alterTableDropColumn | alterTableAddColumns
protected lazy val datamapManagement: Parser[LogicalPlan] =
createDataMap | dropDataMap | showDataMap | refreshDataMap
protected lazy val stream: Parser[LogicalPlan] =
createStream | dropStream | showStreams
protected lazy val cacheManagement: Parser[LogicalPlan] =
showCache | dropCache
protected lazy val alterTable: Parser[LogicalPlan] =
ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (COMPACT ~ stringLit) ~
(WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",") <~ ")").? <~
opt(";") ^^ {
case dbName ~ table ~ (compact ~ compactType) ~ segs =>
val altertablemodel =
AlterTableModel(convertDbNameToLowerCase(dbName), table, None, compactType,
Some(System.currentTimeMillis()), null, segs)
CarbonAlterTableCompactionCommand(altertablemodel)
}
/**
* The below syntax is used to change the status of the segment
* from "streaming" to "streaming finish".
* ALTER TABLE tableName FINISH STREAMING
*/
protected lazy val alterTableFinishStreaming: Parser[LogicalPlan] =
ALTER ~> TABLE ~> (ident <~ ".").? ~ ident <~ FINISH <~ STREAMING <~ opt(";") ^^ {
case dbName ~ table =>
CarbonAlterTableFinishStreaming(dbName, table)
}
/**
* The syntax of CREATE STREAM
* CREATE STREAM [IF NOT EXISTS] streamName ON TABLE [dbName.]tableName
* [STMPROPERTIES('KEY'='VALUE')]
* AS SELECT COUNT(COL1) FROM tableName
*/
protected lazy val createStream: Parser[LogicalPlan] =
CREATE ~> STREAM ~> opt(IF ~> NOT ~> EXISTS) ~ ident ~
(ON ~> TABLE ~> (ident <~ ".").?) ~ ident ~
(STMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~
(AS ~> restInput) <~ opt(";") ^^ {
case ifNotExists ~ streamName ~ dbName ~ tableName ~ options ~ query =>
val optionMap = options.getOrElse(List[(String, String)]()).toMap[String, String]
CarbonCreateStreamCommand(
streamName, dbName, tableName, ifNotExists.isDefined, optionMap, query)
}
/**
* The syntax of DROP STREAM
* DROP STREAM [IF EXISTS] streamName
*/
protected lazy val dropStream: Parser[LogicalPlan] =
DROP ~> STREAM ~> opt(IF ~> EXISTS) ~ ident <~ opt(";") ^^ {
case ifExists ~ streamName =>
CarbonDropStreamCommand(streamName, ifExists.isDefined)
}
/**
* The syntax of SHOW STREAMS
* SHOW STREAMS [ON TABLE dbName.tableName]
*/
protected lazy val showStreams: Parser[LogicalPlan] =
SHOW ~> STREAMS ~> opt(ontable) <~ opt(";") ^^ {
case tableIdent =>
CarbonShowStreamsCommand(tableIdent)
}
/**
* The syntax of datamap creation is as follows.
* CREATE DATAMAP IF NOT EXISTS datamapName [ON TABLE tableName]
* USING 'DataMapProviderName'
* [WITH DEFERRED REBUILD]
* DMPROPERTIES('KEY'='VALUE') AS SELECT COUNT(COL1) FROM tableName
*/
protected lazy val createDataMap: Parser[LogicalPlan] =
CREATE ~> DATAMAP ~> opt(IF ~> NOT ~> EXISTS) ~ ident ~
opt(ontable) ~
(USING ~> stringLit) ~
opt(WITH ~> DEFERRED ~> REBUILD) ~
(DMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~
(AS ~> restInput).? <~ opt(";") ^^ {
case ifnotexists ~ dmname ~ tableIdent ~ dmProviderName ~ deferred ~ dmprops ~ query =>
val map = dmprops.getOrElse(List[(String, String)]()).toMap[String, String]
CarbonCreateDataMapCommand(dmname, tableIdent, dmProviderName, map, query,
ifnotexists.isDefined, deferred.isDefined)
}
protected lazy val ontable: Parser[TableIdentifier] =
ON ~> TABLE ~> (ident <~ ".").? ~ ident ^^ {
case dbName ~ tableName =>
TableIdentifier(tableName, dbName)
}
/**
* The below syntax is used to drop the datamap.
* DROP DATAMAP IF EXISTS datamapName ON TABLE tablename
*/
protected lazy val dropDataMap: Parser[LogicalPlan] =
DROP ~> DATAMAP ~> opt(IF ~> EXISTS) ~ ident ~ opt(ontable) <~ opt(";") ^^ {
case ifexists ~ dmname ~ tableIdent =>
CarbonDropDataMapCommand(dmname, ifexists.isDefined, tableIdent)
}
/**
* The syntax of show datamap is used to show datamaps on the table
* SHOW DATAMAP ON TABLE tableName
*/
protected lazy val showDataMap: Parser[LogicalPlan] =
SHOW ~> DATAMAP ~> opt(ontable) <~ opt(";") ^^ {
case tableIdent =>
CarbonDataMapShowCommand(tableIdent)
}
/**
* The syntax of show datamap is used to show datamaps on the table
* REBUILD DATAMAP datamapname [ON TABLE] tableName
*/
protected lazy val refreshDataMap: Parser[LogicalPlan] =
REBUILD ~> DATAMAP ~> ident ~ opt(ontable) <~ opt(";") ^^ {
case datamap ~ tableIdent =>
CarbonDataMapRebuildCommand(datamap, tableIdent)
}
protected lazy val alterDataMap: Parser[LogicalPlan] =
ALTER ~> DATAMAP ~> (ident <~ ".").? ~ ident ~ (COMPACT ~ stringLit) ~
(WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",") <~ ")").? <~
opt(";") ^^ {
case dbName ~ datamap ~ (compact ~ compactType) ~ segs =>
val altertablemodel =
AlterTableModel(convertDbNameToLowerCase(dbName), datamap + "_table", None, compactType,
Some(System.currentTimeMillis()), null, segs)
CarbonAlterTableCompactionCommand(altertablemodel)
}
protected lazy val deleteRecords: Parser[LogicalPlan] =
(DELETE ~> FROM ~> aliasTable) ~ restInput.? <~ opt(";") ^^ {
case table ~ rest =>
val tableName = getTableName(table._2)
val relation: LogicalPlan = table._3 match {
case Some(a) =>
DeleteRecords(
"select tupleId from " + tableName + " " + table._3.getOrElse("")
+ rest.getOrElse(""),
Some(table._3.get),
table._1)
case None =>
DeleteRecords(
"select tupleId from " + tableName + " " + rest.getOrElse(""),
None,
table._1)
}
relation
}
protected lazy val updateTable: Parser[LogicalPlan] =
UPDATE ~> aliasTable ~
(SET ~> "(" ~> repsep(element, ",") <~ ")") ~
("=" ~> restInput) <~ opt(";") ^^ {
case tab ~ columns ~ rest =>
val (sel, where) = splitQuery(rest)
val selectPattern = """^\s*select\s+""".r
val (selectStmt, relation) =
if (!selectPattern.findFirstIn(sel.toLowerCase).isDefined) {
if (sel.trim.isEmpty) {
sys.error("At least one source column has to be specified ")
}
// only list of expression are given, need to convert that list of expressions into
// select statement on destination table
val relation : UnresolvedRelation = tab._1 match {
case r@CarbonUnresolvedRelation(tableIdentifier) =>
tab._3 match {
case Some(a) => updateRelation(r, tableIdentifier, tab._4, Some(tab._3.get))
case None => updateRelation(r, tableIdentifier, tab._4, None)
}
case _ => tab._1
}
tab._3 match {
case Some(a) =>
("select " + sel + " from " + getTableName(tab._2) + " " + tab._3.get, relation)
case None =>
("select " + sel + " from " + getTableName(tab._2), relation)
}
} else {
(sel, updateRelation(tab._1, tab._2, tab._4, tab._3))
}
val rel = tab._3 match {
case Some(a) => UpdateTable(relation, columns, selectStmt, Some(tab._3.get), where)
case None => UpdateTable(relation,
columns,
selectStmt,
Some(tab._1.tableIdentifier.table),
where)
}
rel
}
private def updateRelation(
r: UnresolvedRelation,
tableIdent: Seq[String],
tableIdentifier: TableIdentifier,
alias: Option[String]): UnresolvedRelation = {
alias match {
case Some(_) => r
case _ =>
val tableAlias = tableIdent match {
case Seq(dbName, tableName) => Some(tableName)
case Seq(tableName) => Some(tableName)
}
// Use Reflection to choose between Spark2.1 and Spark2.2
// Move UnresolvedRelation(tableIdentifier, tableAlias) to reflection.
CarbonReflectionUtils.getUnresolvedRelation(tableIdentifier, tableAlias)
}
}
protected lazy val element: Parser[String] =
(ident <~ ".").? ~ ident ^^ {
case table ~ column => column.toLowerCase
}
protected lazy val table: Parser[UnresolvedRelation] = {
rep1sep(attributeName, ".") ~ opt(ident) ^^ {
case tableIdent ~ alias => UnresolvedRelation(tableIdent)
}
}
protected lazy val aliasTable: Parser[(UnresolvedRelation, List[String], Option[String],
TableIdentifier)] = {
rep1sep(attributeName, ".") ~ opt(ident) ^^ {
case tableIdent ~ alias =>
val tableIdentifier: TableIdentifier = toTableIdentifier(tableIdent)
// Use Reflection to choose between Spark2.1 and Spark2.2
// Move (UnresolvedRelation(tableIdent, alias), tableIdent, alias) to reflection.
val unresolvedRelation = CarbonReflectionUtils.getUnresolvedRelation(tableIdentifier, alias)
(unresolvedRelation, tableIdent, alias, tableIdentifier)
}
}
private def splitQuery(query: String): (String, String) = {
val stack = scala.collection.mutable.Stack[Char]()
var foundSingleQuotes = false
var foundDoubleQuotes = false
var foundEscapeChar = false
var ignoreChar = false
var stop = false
var bracketCount = 0
val (selectStatement, where) = query.span {
ch => {
if (stop) {
false
} else {
ignoreChar = false
if (foundEscapeChar && (ch == '\'' || ch == '\"' || ch == '\\')) {
foundEscapeChar = false
ignoreChar = true
}
// If escaped single or double quotes found, no need to consider
if (!ignoreChar) {
if (ch == '\\') {
foundEscapeChar = true
} else if (ch == '\'') {
foundSingleQuotes = !foundSingleQuotes
} else if (ch == '\"') {
foundDoubleQuotes = !foundDoubleQuotes
}
else if (ch == '(' && !foundSingleQuotes && !foundDoubleQuotes) {
bracketCount = bracketCount + 1
stack.push(ch)
} else if (ch == ')' && !foundSingleQuotes && !foundDoubleQuotes) {
bracketCount = bracketCount + 1
stack.pop()
if (0 == stack.size) {
stop = true
}
}
}
true
}
}
}
if (bracketCount == 0 || bracketCount % 2 != 0) {
sys.error("Parsing error, missing bracket ")
}
val select = selectStatement.trim
select.substring(1, select.length - 1).trim -> where.trim
}
protected lazy val attributeName: Parser[String] = acceptMatch("attribute name", {
case lexical.Identifier(str) => str.toLowerCase
case lexical.Keyword(str) if !lexical.delimiters.contains(str) => str.toLowerCase
})
private def getTableName(tableIdentifier: Seq[String]): String = {
if (tableIdentifier.size > 1) {
tableIdentifier.head + "." + tableIdentifier(1)
} else {
tableIdentifier.head
}
}
protected lazy val loadDataNew: Parser[LogicalPlan] =
LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~
(INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~
(PARTITION ~>"("~> repsep(partitions, ",") <~ ")").? ~
(OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
case filePath ~ isOverwrite ~ table ~ partitions ~ optionsList =>
val (databaseNameOp, tableName) = table match {
case databaseName ~ tableName => (databaseName, tableName.toLowerCase())
}
if (optionsList.isDefined) {
validateOptions(optionsList)
}
val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap
val partitionSpec = partitions.getOrElse(List.empty[(String, Option[String])]).toMap
CarbonLoadDataCommand(
databaseNameOp = convertDbNameToLowerCase(databaseNameOp),
tableName = tableName,
factPathFromUser = filePath,
dimFilesPath = Seq(),
options = optionsMap,
isOverwriteTable = isOverwrite.isDefined,
inputSqlString = null,
dataFrame = None,
updateModel = None,
tableInfoOp = None,
internalOptions = Map.empty,
partition = partitionSpec)
}
protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
DELETE ~> FROM ~ TABLE ~> (ident <~ ".").? ~ ident ~
(WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",")) <~ ")" ~
opt(";") ^^ {
case dbName ~ tableName ~ loadids =>
CarbonDeleteLoadByIdCommand(loadids, dbName, tableName.toLowerCase())
}
protected lazy val deleteLoadsByLoadDate: Parser[LogicalPlan] =
DELETE ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
(WHERE ~> (SEGMENT ~ "." ~ STARTTIME ~> BEFORE) ~ stringLit) <~
opt(";") ^^ {
case database ~ table ~ condition =>
condition match {
case dateField ~ dateValue =>
CarbonDeleteLoadByLoadDateCommand(convertDbNameToLowerCase(database),
table.toLowerCase(),
dateField,
dateValue)
}
}
/**
* ALTER TABLE [dbName.]tableName ADD SEGMENT
* OPTIONS('path'='path','format'='format', ['partition'='schema list'])
*
* schema list format: column_name:data_type
* for example: 'partition'='a:int,b:string'
*/
protected lazy val addLoad: Parser[LogicalPlan] =
ALTER ~ TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> SEGMENT) ~
(OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")") <~ opt(";") ^^ {
case dbName ~ tableName ~ segment ~ optionsList =>
CarbonAddLoadCommand(dbName, tableName, optionsList.toMap)
}
/**
* INSERT INTO [dbName.]tableName STAGE
*/
protected lazy val insertStageData: Parser[LogicalPlan] =
INSERT ~ INTO ~> (ident <~ ".").? ~ ident <~ STAGE <~ opt(";") ^^ {
case dbName ~ tableName =>
CarbonInsertFromStageCommand(dbName, tableName)
}
protected lazy val cleanFiles: Parser[LogicalPlan] =
CLEAN ~> FILES ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident <~ opt(";") ^^ {
case databaseName ~ tableName =>
CarbonCleanFilesCommand(
convertDbNameToLowerCase(databaseName),
Option(tableName.toLowerCase()))
}
protected lazy val explainPlan: Parser[LogicalPlan] =
(EXPLAIN ~> opt(EXTENDED)) ~ startCommand ^^ {
case isExtended ~ logicalPlan =>
logicalPlan match {
case _: CarbonCreateTableCommand =>
ExplainCommand(logicalPlan, extended = isExtended.isDefined)
case _ => CarbonToSparkAdapter.getExplainCommandObj
}
}
protected lazy val showLoads: Parser[LogicalPlan] =
(SHOW ~> opt(HISTORY) <~ SEGMENTS <~ FOR <~ TABLE) ~ (ident <~ ".").? ~ ident ~
(LIMIT ~> numericLit).? <~
opt(";") ^^ {
case showHistory ~ databaseName ~ tableName ~ limit =>
CarbonShowLoadsCommand(
convertDbNameToLowerCase(databaseName), tableName.toLowerCase(), limit,
showHistory.isDefined)
}
protected lazy val showCache: Parser[LogicalPlan] =
SHOW ~> METACACHE ~> opt(ontable) <~ opt(";") ^^ {
case table =>
CarbonShowCacheCommand(table)
}
protected lazy val dropCache: Parser[LogicalPlan] =
DROP ~> METACACHE ~> ontable <~ opt(";") ^^ {
case table =>
CarbonDropCacheCommand(table)
}
protected lazy val cli: Parser[LogicalPlan] =
CARBONCLI ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~
(OPTIONS ~> "(" ~> commandOptions <~ ")") <~ opt(";") ^^ {
case databaseName ~ tableName ~ commandOptions =>
CarbonCliCommand(
convertDbNameToLowerCase(databaseName),
tableName.toLowerCase(),
commandOptions)
}
protected lazy val alterTableColumnRenameAndModifyDataType: Parser[LogicalPlan] =
ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ CHANGE ~ ident ~ ident ~
ident ~ opt("(" ~> rep1sep(valueOptions, ",") <~ ")") <~ opt(";") ^^ {
case dbName ~ table ~ change ~ columnName ~ columnNameCopy ~ dataType ~ values =>
var isColumnRename = false
// If both the column name are not same, then its a call for column rename
if (!columnName.equalsIgnoreCase(columnNameCopy)) {
isColumnRename = true
}
val alterTableColRenameAndDataTypeChangeModel =
AlterTableDataTypeChangeModel(parseDataType(dataType.toLowerCase,
values,
isColumnRename),
convertDbNameToLowerCase(dbName),
table.toLowerCase,
columnName.toLowerCase,
columnNameCopy.toLowerCase,
isColumnRename)
CarbonAlterTableColRenameDataTypeChangeCommand(alterTableColRenameAndDataTypeChangeModel)
}
protected lazy val alterTableAddColumns: Parser[LogicalPlan] =
ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~
(ADD ~> COLUMNS ~> "(" ~> repsep(anyFieldDef, ",") <~ ")") ~
(TBLPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
case dbName ~ table ~ fields ~ tblProp =>
fields.foreach{ f =>
if (isComplexDimDictionaryExclude(f.dataType.get)) {
throw new MalformedCarbonCommandException(
s"Add column is unsupported for complex datatype column: ${f.column}")
}
}
val tableProps = if (tblProp.isDefined) {
tblProp.get.groupBy(_._1.toLowerCase).foreach(f =>
if (f._2.size > 1) {
val name = f._1.toLowerCase
val colName = name.substring(14)
if (name.startsWith("default.value.") &&
fields.count(p => p.column.equalsIgnoreCase(colName)) == 1) {
sys.error(s"Duplicate default value exist for new column: ${ colName }")
}
}
)
// default value should not be converted to lower case
val tblProps = tblProp.get
.map(f => if (CarbonCommonConstants.TABLE_BLOCKSIZE.equalsIgnoreCase(f._1) ||
CarbonCommonConstants.SORT_COLUMNS.equalsIgnoreCase(f._1) ||
CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE.equalsIgnoreCase(f._1) ||
CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD.equalsIgnoreCase(f._1)) {
throw new MalformedCarbonCommandException(
s"Unsupported Table property in add column: ${ f._1 }")
} else if (f._1.toLowerCase.startsWith("default.value.")) {
if (fields.count(field => checkFieldDefaultValue(field.column,
f._1.toLowerCase)) == 1) {
f._1 -> f._2
} else {
throw new MalformedCarbonCommandException(
s"Default.value property does not matches with the columns in ALTER command. " +
s"Column name in property is: ${ f._1}")
}
} else {
f._1 -> f._2.toLowerCase
})
scala.collection.mutable.Map(tblProps: _*)
} else {
scala.collection.mutable.Map.empty[String, String]
}
val tableModel = prepareTableModel (false,
convertDbNameToLowerCase(dbName),
table.toLowerCase,
fields.map(convertFieldNamesToLowercase),
Seq.empty,
tableProps,
None,
true)
val alterTableAddColumnsModel = AlterTableAddColumnsModel(
convertDbNameToLowerCase(dbName),
table,
tableProps.toMap,
tableModel.dimCols,
tableModel.msrCols,
tableModel.highcardinalitydims.getOrElse(Seq.empty))
CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
}
private def checkFieldDefaultValue(fieldName: String, defaultValueColumnName: String): Boolean = {
defaultValueColumnName.equalsIgnoreCase("default.value." + fieldName)
}
private def convertFieldNamesToLowercase(field: Field): Field = {
val name = field.column.toLowerCase
field.copy(column = name, name = Some(name))
}
protected lazy val alterTableDropColumn: Parser[LogicalPlan] =
ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ DROP ~ COLUMNS ~
("(" ~> rep1sep(ident, ",") <~ ")") <~ opt(";") ^^ {
case dbName ~ table ~ drop ~ columns ~ values =>
// validate that same column name is not repeated
values.map(_.toLowerCase).groupBy(identity).collect {
case (x, ys) if ys.lengthCompare(1) > 0 =>
throw new MalformedCarbonCommandException(s"$x is duplicate. Duplicate columns not " +
s"allowed")
}
val alterTableDropColumnModel = AlterTableDropColumnModel(convertDbNameToLowerCase(dbName),
table.toLowerCase,
values.map(_.toLowerCase))
CarbonAlterTableDropColumnCommand(alterTableDropColumnModel)
}
def getFields(schema: Seq[StructField]): Seq[Field] = {
schema.map { col =>
var columnComment: String = ""
var plainComment: String = ""
if (col.getComment().isDefined) {
columnComment = " comment \"" + col.getComment().get + "\""
plainComment = col.getComment().get
}
val x =
if (col.dataType.catalogString == "float") {
'`' + col.name + '`' + " double" + columnComment
} else {
'`' + col.name + '`' + ' ' + col.dataType.catalogString + columnComment
}
val f: Field = anyFieldDef(new lexical.Scanner(x.toLowerCase))
match {
case Success(field, _) => field.asInstanceOf[Field]
case failureOrError => throw new MalformedCarbonCommandException(
s"Unsupported data type: ${ col.dataType }")
}
// the data type of the decimal type will be like decimal(10,0)
// so checking the start of the string and taking the precision and scale.
// resetting the data type with decimal
if (f.dataType.getOrElse("").startsWith("decimal")) {
val (precision, scale) = CommonUtil.getScaleAndPrecision(col.dataType.catalogString)
f.precision = precision
f.scale = scale
f.dataType = Some("decimal")
}
if (f.dataType.getOrElse("").startsWith("char")) {
f.dataType = Some("char")
}
else if (f.dataType.getOrElse("").startsWith("float")) {
f.dataType = Some("double")
}
f.rawSchema = x
f.columnComment = plainComment
f
}
}
def addMVSkipFunction(sql: String): String = {
addMVSkipUDF(new lexical.Scanner(sql)) match {
case Success(query, _) => query
case _ =>
throw new MalformedCarbonCommandException(s"Unsupported query")
}
}
def getBucketFields(
properties: mutable.Map[String, String],
fields: Seq[Field],
options: CarbonOption): Option[BucketFields] = {
if (!CommonUtil.validateTblProperties(properties,
fields)) {
throw new MalformedCarbonCommandException("Invalid table properties")
}
if (options.isBucketingEnabled) {
if (options.bucketNumber.toString.contains("-") ||
options.bucketNumber.toString.contains("+") || options.bucketNumber == 0) {
throw new MalformedCarbonCommandException("INVALID NUMBER OF BUCKETS SPECIFIED")
}
else {
Some(BucketFields(options.bucketColumns.toLowerCase.split(",").map(_.trim),
options.bucketNumber))
}
} else {
None
}
}
}