blob: b6441fb609e0143a045975f2b73865e5629117bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command
import java.util
import java.util.UUID
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.util.CarbonException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema._
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier, TableInfo, TableSchema}
import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, ParentColumnTableRelation}
import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.CompactionType
import org.apache.carbondata.spark.CarbonSparkFactory
import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}
case class TableModel(
ifNotExistsSet: Boolean,
databaseNameOp: Option[String],
tableName: String,
tableProperties: Map[String, String],
dimCols: Seq[Field],
msrCols: Seq[Field],
sortKeyDims: Option[Seq[String]],
varcharCols: Option[Seq[String]],
highcardinalitydims: Option[Seq[String]],
noInvertedIdxCols: Option[Seq[String]],
innvertedIdxCols: Option[Seq[String]],
colProps: Option[util.Map[String, util.List[ColumnProperty]]] = None,
bucketFields: Option[BucketFields],
partitionInfo: Option[PartitionInfo],
tableComment: Option[String] = None,
var parentTable: Option[CarbonTable] = None,
var dataMapRelation: Option[scala.collection.mutable.LinkedHashMap[Field, DataMapField]] = None)
case class Field(column: String, var dataType: Option[String], name: Option[String],
children: Option[List[Field]], parent: String = null,
storeType: Option[String] = Some("columnar"),
var schemaOrdinal: Int = -1,
var precision: Int = 0, var scale: Int = 0, var rawSchema: String = "",
var columnComment: String = "") {
override def equals(o: Any) : Boolean = o match {
case that: Field =>
that.column.equalsIgnoreCase(this.column)
case _ => false
}
override def hashCode : Int = column.hashCode
}
case class DataMapField(var aggregateFunction: String = "",
columnTableRelationList: Option[Seq[ColumnTableRelation]] = None) {
}
case class ColumnTableRelation(parentColumnName: String, parentColumnId: String,
parentTableName: String, parentDatabaseName: String, parentTableId: String) {
}
case class ColumnProperty(key: String, value: String)
case class ComplexField(complexType: String, primitiveField: Option[Field],
complexField: Option[ComplexField])
case class PartitionerField(partitionColumn: String, dataType: Option[String],
columnComment: String)
case class BucketFields(bucketColumns: Seq[String], numberOfBuckets: Int)
case class DataLoadTableFileMapping(table: String, loadPath: String)
case class ExecutionErrors(var failureCauses: FailureCauses, var errorMsg: String )
case class CarbonMergerMapping(
hdfsStoreLocation: String,
metadataFilePath: String,
var mergedLoadName: String,
databaseName: String,
factTableName: String,
validSegments: Array[Segment],
tableId: String,
campactionType: CompactionType,
// maxSegmentColCardinality is Cardinality of last segment of compaction
var maxSegmentColCardinality: Array[Int],
// maxSegmentColumnSchemaList is list of column schema of last segment of compaction
var maxSegmentColumnSchemaList: List[ColumnSchema],
@transient currentPartitions: Option[Seq[PartitionSpec]])
case class NodeInfo(TaskId: String, noOfBlocks: Int)
case class AlterTableModel(
dbName: Option[String],
tableName: String,
segmentUpdateStatusManager: Option[SegmentUpdateStatusManager],
compactionType: String,
factTimeStamp: Option[Long],
var alterSql: String,
customSegmentIds: Option[List[String]] = None)
case class UpdateTableModel(
isUpdate: Boolean,
updatedTimeStamp: Long,
var executorErrors: ExecutionErrors,
deletedSegments: Seq[Segment])
case class CompactionModel(compactionSize: Long,
compactionType: CompactionType,
carbonTable: CarbonTable,
isDDLTrigger: Boolean,
currentPartitions: Option[Seq[PartitionSpec]],
customSegmentIds: Option[List[String]])
case class CompactionCallableModel(carbonLoadModel: CarbonLoadModel,
carbonTable: CarbonTable,
loadsToMerge: util.List[LoadMetadataDetails],
sqlContext: SQLContext,
compactionType: CompactionType,
currentPartitions: Option[Seq[PartitionSpec]],
compactedSegments: java.util.List[String])
case class AlterPartitionModel(carbonLoadModel: CarbonLoadModel,
segmentId: String,
oldPartitionIds: List[Int],
sqlContext: SQLContext
)
case class SplitPartitionCallableModel(carbonLoadModel: CarbonLoadModel,
segmentId: String,
partitionId: String,
oldPartitionIds: List[Int],
sqlContext: SQLContext)
case class DropPartitionCallableModel(carbonLoadModel: CarbonLoadModel,
segmentId: Segment,
partitionId: String,
oldPartitionIds: List[Int],
dropWithData: Boolean,
carbonTable: CarbonTable,
sqlContext: SQLContext)
case class DataTypeInfo(dataType: String, precision: Int = 0, scale: Int = 0)
class AlterTableColumnRenameModel(columnName: String,
newColumnName: String,
isColumnRename: Boolean)
case class AlterTableDataTypeChangeModel(dataTypeInfo: DataTypeInfo,
databaseName: Option[String],
tableName: String,
columnName: String,
newColumnName: String,
isColumnRename: Boolean)
extends AlterTableColumnRenameModel(columnName, newColumnName, isColumnRename)
case class AlterTableRenameModel(
oldTableIdentifier: TableIdentifier,
newTableIdentifier: TableIdentifier
)
case class AlterTableAddColumnsModel(
databaseName: Option[String],
tableName: String,
tableProperties: Map[String, String],
dimCols: Seq[Field],
msrCols: Seq[Field],
highCardinalityDims: Seq[String])
case class AlterTableDropColumnModel(databaseName: Option[String],
tableName: String,
columns: List[String])
case class AlterTableDropPartitionModel(databaseName: Option[String],
tableName: String,
partitionId: String,
dropWithData: Boolean)
case class AlterTableSplitPartitionModel(databaseName: Option[String],
tableName: String,
partitionId: String,
splitInfo: List[String])
class AlterTableColumnSchemaGenerator(
alterTableModel: AlterTableAddColumnsModel,
dbName: String,
tableInfo: TableInfo,
tableIdentifier: AbsoluteTableIdentifier,
sc: SparkContext) {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
private def isSortColumn(columnName: String): Boolean = {
val sortColumns = alterTableModel.tableProperties.get("sort_columns")
if(sortColumns.isDefined) {
sortColumns.get.contains(columnName)
} else {
true
}
}
private def isVarcharColumn(columnName: String): Boolean = {
val varcharColumns = alterTableModel.tableProperties.get("long_string_columns")
if (varcharColumns.isDefined) {
varcharColumns.get.contains(columnName)
} else {
false
}
}
def process: Seq[ColumnSchema] = {
val tableSchema = tableInfo.getFactTable
val tableCols = tableSchema.getListOfColumns.asScala
val existingColsSize = tableCols.size
var longStringCols = Seq[ColumnSchema]()
// get all original dimension columns
// but exclude complex type columns and long string columns
var allColumns = tableCols.filter(x =>
(x.isDimensionColumn && !x.getDataType.isComplexType()
&& x.getSchemaOrdinal != -1 && (x.getDataType != DataTypes.VARCHAR)))
var newCols = Seq[ColumnSchema]()
var invertedIndxCols: Array[String] = Array[String]()
if (alterTableModel.tableProperties.get(CarbonCommonConstants.INVERTED_INDEX).isDefined) {
invertedIndxCols = alterTableModel.tableProperties(CarbonCommonConstants.INVERTED_INDEX)
.split(',').map(_.trim)
}
// add new dimension columns
alterTableModel.dimCols.foreach(field => {
val encoders = new java.util.ArrayList[Encoding]()
encoders.add(Encoding.DICTIONARY)
val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
field,
encoders,
isDimensionCol = true,
field.precision,
field.scale,
field.schemaOrdinal + existingColsSize,
alterTableModel.highCardinalityDims,
alterTableModel.databaseName.getOrElse(dbName),
isSortColumn(field.name.getOrElse(field.column)),
isVarcharColumn(field.name.getOrElse(field.column)))
if (columnSchema.getDataType == DataTypes.VARCHAR) {
// put the new long string columns in 'longStringCols'
// and add them after old long string columns
longStringCols ++= Seq(columnSchema)
} else {
allColumns ++= Seq(columnSchema)
}
newCols ++= Seq(columnSchema)
})
// put the old long string columns
allColumns ++= tableCols.filter(x =>
(x.isDimensionColumn && (x.getDataType == DataTypes.VARCHAR)))
// put the new long string columns
allColumns ++= longStringCols
// put complex type columns at the end of dimension columns
allColumns ++= tableCols.filter(x =>
(x.isDimensionColumn && (x.getDataType.isComplexType() || x.getSchemaOrdinal == -1)))
// original measure columns
allColumns ++= tableCols.filter(x => !x.isDimensionColumn)
// add new measure columns
alterTableModel.msrCols.foreach(field => {
val encoders = new java.util.ArrayList[Encoding]()
val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
field,
encoders,
isDimensionCol = false,
field.precision,
field.scale,
field.schemaOrdinal + existingColsSize,
alterTableModel.highCardinalityDims,
alterTableModel.databaseName.getOrElse(dbName)
)
allColumns ++= Seq(columnSchema)
newCols ++= Seq(columnSchema)
})
if (invertedIndxCols.nonEmpty) {
for (column <- newCols) {
if (invertedIndxCols.contains(column.getColumnName) && column.isDimensionColumn) {
column.setUseInvertedIndex(true)
}
}
}
// Check if there is any duplicate measures or dimensions.
// Its based on the dimension name and measure name
allColumns.filter(x => !x.isInvisible).groupBy(_.getColumnName)
.foreach(f => if (f._2.size > 1) {
val name = f._1
sys.error(s"Duplicate column found with name: $name")
})
if (newCols.exists(_.getDataType.isComplexType)) {
sys.error(s"Complex column cannot be added")
}
val columnValidator = CarbonSparkFactory.getCarbonColumnValidator
columnValidator.validateColumns(allColumns)
allColumns = CarbonScalaUtil.reArrangeColumnSchema(allColumns)
if (tableInfo.getFactTable.getPartitionInfo != null) {
val par = tableInfo.getFactTable.getPartitionInfo.getColumnSchemaList
allColumns = allColumns.filterNot(b => par.contains(b)) ++= par.asScala
}
def getLocalDictColumnList(tableProperties: scala.collection.mutable.Map[String, String],
columns: scala.collection.mutable.ListBuffer[ColumnSchema]): (scala.collection.mutable
.ListBuffer[ColumnSchema], scala.collection.mutable.ListBuffer[ColumnSchema]) = {
val includeColumns = new scala.collection.mutable.ListBuffer[ColumnSchema]
val excludeColumns = new scala.collection.mutable.ListBuffer[ColumnSchema]
val localDictIncludeColumns = if (tableProperties
.get(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE).isDefined) {
tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE)
} else {
null
}
val localDictExcludeColumns = if (tableProperties
.get(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE).isDefined) {
tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE)
} else {
null
}
if (null != localDictIncludeColumns) {
if (null == localDictExcludeColumns) {
columns.foreach { column =>
if (localDictIncludeColumns.contains(column.getColumnName)) {
includeColumns.append(column)
} else {
if (column.getDataType.equals(DataTypes.STRING) ||
column.getDataType.equals(DataTypes.VARCHAR) ||
column.getDataType.toString.equals("ARRAY") ||
column.getDataType.toString.equals("STRUCT")) {
excludeColumns.append(column)
}
}
}
} else {
columns.foreach { column =>
if (localDictIncludeColumns.contains(column.getColumnName) &&
!localDictExcludeColumns.contains(column.getColumnName)) {
includeColumns.append(column)
} else if (localDictExcludeColumns.contains(column.getColumnName)) {
excludeColumns.append(column)
}
}
}
} else {
if (null == localDictExcludeColumns) {
columns.foreach { column =>
if (column.getDataType.equals(DataTypes.STRING) ||
column.getDataType.equals(DataTypes.VARCHAR) ||
column.getDataType.toString.equals("ARRAY") ||
column.getDataType.toString.equals("STRUCT")) {
includeColumns.append(column)
}
}
} else {
columns.foreach { column =>
if (!localDictExcludeColumns.contains(column.getColumnName) &&
(column.getDataType.equals(DataTypes.STRING) ||
column.getDataType.equals(DataTypes.VARCHAR) ||
column.getDataType.toString.equals("ARRAY") ||
column.getDataType.toString.equals("STRUCT"))) {
includeColumns.append(column)
} else if (localDictExcludeColumns.contains(column.getColumnName)) {
excludeColumns.append(column)
}
}
}
}
(includeColumns, excludeColumns)
}
val columnsWithoutNewCols = new scala.collection.mutable.ListBuffer[ColumnSchema]
allColumns.foreach { column =>
if (!newCols.exists(x => x.getColumnName.equalsIgnoreCase(column.getColumnName))) {
columnsWithoutNewCols += column
}
}
val isLocalDictEnabledForMainTable = tableSchema.getTableProperties
.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
val alterMutableTblProperties: scala.collection.mutable.Map[String, String] = mutable
.Map(alterTableModel.tableProperties.toSeq: _*)
// if local dictionary is enabled, then validate include and exclude columns if defined
if (null != isLocalDictEnabledForMainTable && isLocalDictEnabledForMainTable.toBoolean) {
var localDictIncludeColumns: Seq[String] = Seq[String]()
var localDictExcludeColumns: Seq[String] = Seq[String]()
// validate local dictionary include columns if defined
if (alterTableModel.tableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE)
.isDefined) {
localDictIncludeColumns =
alterTableModel.tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE).split(",")
.map(_.trim)
CarbonScalaUtil
.validateLocalDictionaryColumns(alterMutableTblProperties, localDictIncludeColumns)
CarbonScalaUtil
.validateLocalConfiguredDictionaryColumns(
alterTableModel.dimCols ++ alterTableModel.msrCols,
alterMutableTblProperties,
localDictIncludeColumns)
}
// validate local dictionary exclude columns if defined
if (alterTableModel.tableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE)
.isDefined) {
localDictExcludeColumns =
alterTableModel.tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE).split(",")
.map(_.trim)
CarbonScalaUtil
.validateLocalDictionaryColumns(alterMutableTblProperties, localDictExcludeColumns)
CarbonScalaUtil
.validateLocalConfiguredDictionaryColumns(
alterTableModel.dimCols ++ alterTableModel.msrCols,
alterMutableTblProperties,
localDictExcludeColumns)
}
// validate if both local dictionary include and exclude contains same column
CarbonScalaUtil.validateDuplicateLocalDictIncludeExcludeColmns(alterMutableTblProperties)
CarbonUtil
.setLocalDictColumnsToWrapperSchema(newCols.asJava,
alterTableModel.tableProperties.asJava,
isLocalDictEnabledForMainTable)
}
val includeExcludeColOfMainTable = getLocalDictColumnList(tableSchema.getTableProperties
.asScala,
columnsWithoutNewCols)
val includeExcludeColOfAlterTable = getLocalDictColumnList(alterMutableTblProperties,
newCols.to[mutable.ListBuffer])
// Append all Local Dictionary Include and Exclude columns of Alter Table to that of Main Table
includeExcludeColOfMainTable._1.appendAll(includeExcludeColOfAlterTable._1)
includeExcludeColOfMainTable._2.appendAll(includeExcludeColOfAlterTable._2)
val localDictionaryIncludeColumn = new StringBuilder
val localDictionaryExcludeColumn = new StringBuilder
includeExcludeColOfMainTable._1.foreach { column =>
localDictionaryIncludeColumn.append(column.getColumnName).append(",")
}
includeExcludeColOfMainTable._2.foreach { column =>
localDictionaryExcludeColumn.append(column.getColumnName).append(",")
}
// populate table properties map
val tablePropertiesMap = tableSchema.getTableProperties
alterTableModel.tableProperties.foreach {
case (key, mapValue) =>
val value = tablePropertiesMap.get(key)
if (null != value) {
if (value != mapValue) {
tablePropertiesMap.put(key, value + "," + mapValue)
}
} else {
tablePropertiesMap.put(key, mapValue)
}
}
// The Final Map should contain the combined Local Dictionary Include and
// Local Dictionary Exclude Columns from both Main table and Alter table
tablePropertiesMap
.put(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, localDictionaryIncludeColumn.toString())
tablePropertiesMap
.put(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, localDictionaryExcludeColumn.toString())
// This part will create dictionary file for all newly added dictionary columns
// if valid default value is provided,
// then that value will be included while creating dictionary file
val defaultValueString = "default.value."
newCols.foreach { col =>
var rawData: String = null
for (elem <- alterTableModel.tableProperties) {
if (elem._1.toLowerCase.startsWith(defaultValueString)) {
if (col.getColumnName.equalsIgnoreCase(elem._1.substring(defaultValueString.length))) {
rawData = elem._2
val data = DataTypeUtil.convertDataToBytesBasedOnDataType(elem._2, col)
if (null != data) {
col.setDefaultValue(data)
} else {
LOGGER
.error(
"Invalid default value for new column " + dbName + "." +
alterTableModel.tableName +
"." + col.getColumnName + " : " + elem._2)
}
}
}
else if (elem._1.equalsIgnoreCase("no_inverted_index") &&
(elem._2.split(",").contains(col.getColumnName))) {
col.getEncodingList.remove(Encoding.INVERTED_INDEX)
}
}
}
tableSchema.setListOfColumns(allColumns.asJava)
tableInfo.setLastUpdatedTime(System.currentTimeMillis())
tableInfo.setFactTable(tableSchema)
newCols
}
}
// TODO: move this to carbon store API
object TableNewProcessor {
def apply(
cm: TableModel): TableInfo = {
new TableNewProcessor(cm).process
}
def createColumnSchema(
field: Field,
encoders: java.util.List[Encoding],
isDimensionCol: Boolean,
precision: Int,
scale: Int,
schemaOrdinal: Int,
highCardinalityDims: Seq[String],
databaseName: String,
isSortColumn: Boolean = false,
isVarcharColumn: Boolean = false): ColumnSchema = {
val dataType = DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse(""))
if (DataTypes.isDecimal(dataType)) {
dataType.asInstanceOf[DecimalType].setPrecision(field.precision)
dataType.asInstanceOf[DecimalType].setScale(field.scale)
}
val columnSchema = new ColumnSchema()
if (isVarcharColumn) {
columnSchema.setDataType(DataTypes.VARCHAR)
} else {
columnSchema.setDataType(dataType)
}
val colName = field.name.getOrElse(field.column)
columnSchema.setColumnName(colName)
if (highCardinalityDims.contains(colName)) {
encoders.remove(Encoding.DICTIONARY)
}
if (dataType == DataTypes.DATE) {
encoders.add(Encoding.DIRECT_DICTIONARY)
}
if (dataType == DataTypes.TIMESTAMP && !highCardinalityDims.contains(colName)) {
encoders.add(Encoding.DIRECT_DICTIONARY)
}
columnSchema.setEncodingList(encoders)
val colUniqueIdGenerator = ColumnUniqueIdGenerator.getInstance
val columnUniqueId = colUniqueIdGenerator.generateUniqueId(columnSchema)
columnSchema.setColumnUniqueId(columnUniqueId)
columnSchema.setColumnReferenceId(columnUniqueId)
columnSchema.setDimensionColumn(isDimensionCol)
columnSchema.setPrecision(precision)
columnSchema.setScale(scale)
columnSchema.setSchemaOrdinal(schemaOrdinal)
columnSchema.setSortColumn(isSortColumn)
columnSchema
}
}
class TableNewProcessor(cm: TableModel) {
def getAllChildren(fieldChildren: Option[List[Field]],
useDictionaryEncoding: Boolean): Seq[ColumnSchema] = {
var allColumns: Seq[ColumnSchema] = Seq[ColumnSchema]()
fieldChildren.foreach(fields => {
fields.foreach(field => {
val encoders = new java.util.ArrayList[Encoding]()
if (useDictionaryEncoding) {
encoders.add(Encoding.DICTIONARY)
}
val columnSchema: ColumnSchema = getColumnSchema(
DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
field.name.getOrElse(field.column),
encoders,
true,
field,
cm.dataMapRelation,
useDictionaryEncoding = useDictionaryEncoding)
allColumns ++= Seq(columnSchema)
if (field.children.get != null) {
columnSchema.setNumberOfChild(field.children.get.size)
allColumns ++= getAllChildren(field.children, useDictionaryEncoding)
}
})
})
allColumns
}
// varchar column is a string column that in long_string_columns
private def isVarcharColumn(colName : String): Boolean = {
cm.varcharCols.get.contains(colName)
}
def getColumnSchema(
dataType: DataType,
colName: String,
encoders: java.util.List[Encoding],
isDimensionCol: Boolean,
field: Field,
map: Option[scala.collection.mutable.LinkedHashMap[Field, DataMapField]],
useDictionaryEncoding: Boolean = true) : ColumnSchema = {
val columnSchema = new ColumnSchema()
columnSchema.setDataType(dataType)
columnSchema.setColumnName(colName)
val isParentColumnRelation = map.isDefined && map.get.get(field).isDefined
if (!isParentColumnRelation) {
val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
if (highCardinalityDims.contains(colName)) {
encoders.remove(Encoding.DICTIONARY)
}
if (dataType == DataTypes.DATE && useDictionaryEncoding) {
encoders.add(Encoding.DIRECT_DICTIONARY)
}
if (dataType == DataTypes.TIMESTAMP &&
!highCardinalityDims.contains(colName) && useDictionaryEncoding) {
encoders.add(Encoding.DIRECT_DICTIONARY)
}
}
columnSchema.setEncodingList(encoders)
val colUniqueIdGenerator = ColumnUniqueIdGenerator.getInstance
val columnUniqueId = colUniqueIdGenerator.generateUniqueId(columnSchema)
columnSchema.setColumnUniqueId(columnUniqueId)
columnSchema.setColumnReferenceId(columnUniqueId)
columnSchema.setDimensionColumn(isDimensionCol)
columnSchema.setPrecision(field.precision)
columnSchema.setScale(field.scale)
columnSchema.setSchemaOrdinal(field.schemaOrdinal)
columnSchema.setSortColumn(false)
if (isVarcharColumn(colName)) {
columnSchema.setDataType(DataTypes.VARCHAR)
}
if (isParentColumnRelation) {
val dataMapField = map.get.get(field).get
columnSchema.setFunction(dataMapField.aggregateFunction)
val columnRelationList = dataMapField.columnTableRelationList.get
val parentColumnTableRelationList = new util.ArrayList[ParentColumnTableRelation]
columnRelationList.foreach {
columnRelation =>
val relationIdentifier = new RelationIdentifier(
columnRelation.parentDatabaseName,
columnRelation.parentTableName,
columnRelation.parentTableId)
if (cm.parentTable.isDefined) {
relationIdentifier.setTablePath(cm.parentTable.get.getTablePath)
}
val parentColumnTableRelation = new ParentColumnTableRelation(
relationIdentifier,
columnRelation.parentColumnId,
columnRelation.parentColumnName)
parentColumnTableRelationList.add(parentColumnTableRelation)
}
columnSchema.setParentColumnTableRelations(parentColumnTableRelationList)
}
// TODO: Need to fill RowGroupID, converted type
// & Number of Children after DDL finalization
if (field.columnComment.nonEmpty) {
var columnProperties = columnSchema.getColumnProperties
if (columnProperties == null) {
columnProperties = new util.HashMap[String, String]()
columnSchema.setColumnProperties(columnProperties)
}
columnProperties.put(CarbonCommonConstants.COLUMN_COMMENT, field.columnComment)
}
columnSchema
}
// process create dml fields and create wrapper TableInfo object
def process: TableInfo = {
val LOGGER = LogServiceFactory.getLogService(TableNewProcessor.getClass.getName)
var allColumns = Seq[ColumnSchema]()
var index = 0
var measureCount = 0
// Sort columns should be at the begin of all columns
cm.sortKeyDims.get.foreach { keyDim =>
val field = cm.dimCols.find(keyDim equals _.column).get
val encoders = if (getEncoderFromParent(field)) {
cm.parentTable.get.getColumnByName(
cm.dataMapRelation.get(field).columnTableRelationList.get.head.parentColumnName
).getEncoder
} else {
val encoders = new java.util.ArrayList[Encoding]()
encoders.add(Encoding.DICTIONARY)
encoders
}
val columnSchema = getColumnSchema(
DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
field.name.getOrElse(field.column),
encoders,
true,
field,
cm.dataMapRelation)
columnSchema.setSortColumn(true)
allColumns :+= columnSchema
index = index + 1
}
val dictionaryIncludeCols = cm.tableProperties
.getOrElse(CarbonCommonConstants.DICTIONARY_INCLUDE, "")
def addDimensionCol(field: Field): Unit = {
val sortField = cm.sortKeyDims.get.find(field.column equals _)
if (sortField.isEmpty) {
val encoders = if (getEncoderFromParent(field)) {
cm.parentTable.get.getColumnByName(cm.dataMapRelation.get.get(field).get.
columnTableRelationList.get(0).parentColumnName).getEncoder
} else {
val encoders = new java.util.ArrayList[Encoding]()
encoders.add(Encoding.DICTIONARY)
encoders
}
val columnSchema = getColumnSchema(
DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
field.name.getOrElse(field.column),
encoders,
true,
field,
cm.dataMapRelation)
allColumns :+= columnSchema
index = index + 1
if (field.children.isDefined && field.children.get != null) {
val includeDictionaryEncoding = dictionaryIncludeCols.contains(field.column)
if (!includeDictionaryEncoding) {
columnSchema.getEncodingList.remove(Encoding.DICTIONARY)
}
columnSchema.setNumberOfChild(field.children.get.size)
allColumns ++= getAllChildren(field.children, includeDictionaryEncoding)
}
}
}
// add all dimensions
cm.dimCols.foreach(addDimensionCol(_))
// check whether the column is a local dictionary column and set in column schema
if (null != cm.tableProperties) {
CarbonUtil
.setLocalDictColumnsToWrapperSchema(allColumns.asJava,
cm.tableProperties.asJava,
cm.tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE))
}
cm.msrCols.foreach { field =>
// if aggregate function is defined in case of preaggregate and agg function is sum or avg
// then it can be stored as measure
var isAggFunPresent = false
// getting the encoder from maintable so whatever encoding is applied in maintable
// same encoder can be applied on aggregate table
val encoders = if (getEncoderFromParent(field)) {
isAggFunPresent =
cm.dataMapRelation.get(field).aggregateFunction.equalsIgnoreCase("sum") ||
cm.dataMapRelation.get(field).aggregateFunction.equals("avg") ||
cm.dataMapRelation.get(field).aggregateFunction.equals("count")
if (!isAggFunPresent) {
cm.parentTable.get.getColumnByName(
cm.dataMapRelation.get(field).columnTableRelationList.get.head.parentColumnName
).getEncoder
} else {
new java.util.ArrayList[Encoding]()
}
} else {
new java.util.ArrayList[Encoding]()
}
// check if it can be dimension column
val isDimColumn = !encoders.isEmpty && !isAggFunPresent
val columnSchema = getColumnSchema(
DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
field.name.getOrElse(field.column),
encoders,
isDimColumn,
field,
cm.dataMapRelation)
allColumns :+= columnSchema
index = index + 1
if (!isDimColumn) {
measureCount += 1
}
}
// Check if there is any duplicate measures or dimensions.
// Its based on the dimension name and measure name
allColumns.groupBy(_.getColumnName).foreach { f =>
if (f._2.size > 1) {
val name = f._1
CarbonException.analysisException(s"Duplicate dimensions found with name: $name")
}
}
val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
// Setting the boolean value of useInvertedIndex in column schema, if Parent table is defined
// Encoding is already decided above
if (!cm.parentTable.isDefined) {
val noInvertedIndexCols = cm.noInvertedIdxCols.getOrElse(Seq())
val invertedIndexCols = cm.innvertedIdxCols.getOrElse(Seq())
LOGGER.info("NoINVERTEDINDEX columns are : " + noInvertedIndexCols.mkString(","))
for (column <- allColumns) {
// When the column is measure or the specified no inverted index column in DDL,
// set useInvertedIndex to false, otherwise true.
if (invertedIndexCols.contains(column.getColumnName) &&
!cm.msrCols.exists(_.column.equalsIgnoreCase(column.getColumnName))) {
column.setUseInvertedIndex(true)
} else {
column.setUseInvertedIndex(false)
}
}
}
// Adding dummy measure if no measure is provided
if (measureCount == 0) {
val encoders = new java.util.ArrayList[Encoding]()
val field = Field(
CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE,
Some(DataTypes.DOUBLE.getName),
Some(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE),
None
)
val columnSchema: ColumnSchema = getColumnSchema(
DataTypes.DOUBLE,
CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE,
encoders,
false,
field,
cm.dataMapRelation)
columnSchema.setInvisible(true)
allColumns :+= columnSchema
}
val columnValidator = CarbonSparkFactory.getCarbonColumnValidator
columnValidator.validateColumns(allColumns)
val tableInfo = new TableInfo()
val tableSchema = new TableSchema()
val schemaEvol = new SchemaEvolution()
schemaEvol.setSchemaEvolutionEntryList(new util.ArrayList[SchemaEvolutionEntry]())
tableSchema.setTableId(UUID.randomUUID().toString)
// populate table properties map
val tablePropertiesMap = new java.util.HashMap[String, String]()
cm.tableProperties.foreach {
x => tablePropertiesMap.put(x._1, x._2)
}
// Add table comment to table properties
tablePropertiesMap.put("comment", cm.tableComment.getOrElse(""))
val badRecordsPath = getBadRecordsPath(tablePropertiesMap,
cm.tableName,
tableSchema.getTableId,
cm.databaseNameOp.getOrElse("default"))
tablePropertiesMap.put("bad_record_path", badRecordsPath)
tableSchema.setTableProperties(tablePropertiesMap)
if (cm.bucketFields.isDefined) {
val bucketCols = cm.bucketFields.get.bucketColumns.map { b =>
val col = allColumns.find(_.getColumnName.equalsIgnoreCase(b))
col match {
case Some(colSchema: ColumnSchema) =>
if (colSchema.isDimensionColumn && !colSchema.getDataType.isComplexType) {
colSchema
} else {
LOGGER.error(s"Bucket field must be dimension column and " +
s"should not be measure or complex column: ${ colSchema.getColumnName }")
CarbonException.analysisException(s"Bucket field must be dimension column and " +
s"should not be measure or complex column: ${
colSchema
.getColumnName
}")
}
case _ =>
LOGGER.error(s"Bucket field is not present in table columns")
CarbonException.analysisException(s"Bucket field is not present in table columns")
}
}
tableSchema.setBucketingInfo(
new BucketingInfo(bucketCols.asJava, cm.bucketFields.get.numberOfBuckets))
}
if (cm.partitionInfo.isDefined) {
val partitionInfo = cm.partitionInfo.get
val partitionColumnSchema = partitionInfo.getColumnSchemaList.asScala
val partitionCols = allColumns.filter { column =>
partitionColumnSchema.exists(_.getColumnName.equalsIgnoreCase(column.getColumnName))
}
val orderCols =
partitionColumnSchema.map(
f => partitionCols.find(_.getColumnName.equalsIgnoreCase(f.getColumnName)).get).asJava
partitionInfo.setColumnSchemaList(orderCols)
tableSchema.setPartitionInfo(partitionInfo)
}
tableSchema.setTableName(cm.tableName)
tableSchema.setListOfColumns(allColumns.asJava)
tableSchema.setSchemaEvolution(schemaEvol)
tableInfo.setDatabaseName(cm.databaseNameOp.getOrElse(null))
tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(cm.databaseNameOp.getOrElse(null),
cm.tableName))
tableInfo.setLastUpdatedTime(System.currentTimeMillis())
tableInfo.setFactTable(tableSchema)
tableInfo
}
private def getBadRecordsPath(tablePropertiesMap: util.HashMap[String, String],
tableName: String,
tableId: String,
databaseName: String): String = {
val badRecordsPath = tablePropertiesMap.asScala
.getOrElse("bad_record_path", CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)
if (badRecordsPath == null || badRecordsPath.isEmpty) {
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL
} else {
badRecordsPath + CarbonCommonConstants.FILE_SEPARATOR + databaseName +
CarbonCommonConstants.FILE_SEPARATOR + s"${tableName}_$tableId"
}
}
/**
* Method to check to get the encoder from parent or not
*
* @param field column field
* @return get encoder from parent
*/
private def getEncoderFromParent(field: Field): Boolean = {
cm.parentTable.isDefined &&
cm.dataMapRelation.get.get(field).isDefined &&
cm.dataMapRelation.get.get(field).get.columnTableRelationList.size == 1
}
// For checking if the specified col group columns are specified in fields list.
protected def checkColGroupsValidity(colGrps: Seq[String],
allCols: Seq[ColumnSchema],
highCardCols: Seq[String]): Unit = {
if (null != colGrps) {
colGrps.foreach(columngroup => {
val rowCols = columngroup.split(",")
rowCols.foreach(colForGrouping => {
var found: Boolean = false
// check for dimensions + measures
allCols.foreach(eachCol => {
if (eachCol.getColumnName.equalsIgnoreCase(colForGrouping.trim())) {
found = true
}
})
// check for No Dicitonary dimensions
highCardCols.foreach(noDicCol => {
if (colForGrouping.trim.equalsIgnoreCase(noDicCol)) {
found = true
}
})
if (!found) {
CarbonException.analysisException(
s"column $colForGrouping is not present in Field list")
}
})
})
}
}
}