blob: 9de083397653be54c8b1b19eb9c5bb20caa0b7a3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.view
import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.Breaks.{break, breakable}
import org.apache.log4j.Logger
import org.apache.spark.sql.{CarbonEnv, CarbonSource, Row, SparkSession}
import org.apache.spark.sql.catalyst.{CarbonParserUtil, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Coalesce, Expression, Literal, ScalaUDF}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average}
import org.apache.spark.sql.catalyst.plans.logical.{Join, Limit, LogicalPlan, Sort}
import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, Field, PartitionerField, TableModel, TableNewProcessor}
import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, CarbonDropTableCommand}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.parser.MVQueryParser
import org.apache.spark.sql.types.{ArrayType, DateType, MapType, StructType}
import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedMVCommandException}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier}
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.view._
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan, SimpleModularizer}
import org.apache.carbondata.mv.plans.util.{BirdcageOptimizer, SQLBuilder}
import org.apache.carbondata.spark.util.CommonUtil
import org.apache.carbondata.view._
/**
* Create Materialized View Command implementation
* It will create the MV table, load the MV table (if deferred rebuild is false),
* and register the MV schema in [[MVManager]]
*/
case class CarbonCreateMVCommand(
databaseNameOption: Option[String],
name: String,
properties: Map[String, String],
queryString: String,
ifNotExistsSet: Boolean = false,
deferredRefresh: Boolean = false)
extends AtomicRunnableCommand {
private val logger = CarbonCreateMVCommand.LOGGER
private var viewSchema: MVSchema = _
override def processMetadata(session: SparkSession): Seq[Row] = {
setAuditInfo(Map("mvName" -> name) ++ properties)
checkProperties(mutable.Map[String, String](properties.toSeq: _*))
val viewManager = MVManagerInSpark.get(session)
val databaseName = databaseNameOption.getOrElse(session.sessionState.catalog.getCurrentDatabase)
if (viewManager.getSchema(databaseName, name) != null) {
if (!ifNotExistsSet) {
throw new MalformedMVCommandException(
s"Materialized view with name ${ databaseName }.${ name } already exists")
} else {
return Seq.empty
}
}
val identifier = TableIdentifier(name, Option(databaseName))
val databaseLocation = viewManager.getDatabaseLocation(databaseName)
val systemDirectoryPath = CarbonProperties.getInstance()
.getSystemFolderLocationPerDatabase(FileFactory
.getCarbonFile(databaseLocation)
.getCanonicalPath)
val operationContext: OperationContext = new OperationContext()
OperationListenerBus.getInstance().fireEvent(
CreateMVPreExecutionEvent(session, systemDirectoryPath, identifier),
operationContext)
val catalogFactory = new MVCatalogFactory[MVSchemaWrapper] {
override def newCatalog(): MVCatalog[MVSchemaWrapper] = {
new MVCatalogInSpark(session)
}
}
// get mv catalog
var viewCatalog = viewManager.getCatalog(catalogFactory, false)
.asInstanceOf[MVCatalogInSpark]
if (!viewCatalog.session.equals(session)) {
viewCatalog = viewManager.getCatalog(catalogFactory, true)
.asInstanceOf[MVCatalogInSpark]
}
val schema = doCreate(session, identifier, viewManager, viewCatalog)
try {
viewCatalog.registerSchema(schema)
if (schema.isRefreshOnManual) {
viewManager.setStatus(schema.getIdentifier, MVStatus.DISABLED)
}
} catch {
case exception: Exception =>
val dropTableCommand = CarbonDropTableCommand(
ifExistsSet = true,
Option(databaseName),
name,
dropChildTable = true)
dropTableCommand.run(session)
viewManager.deleteSchema(databaseName, name)
throw exception
}
OperationListenerBus.getInstance().fireEvent(
CreateMVPostExecutionEvent(session, systemDirectoryPath, identifier),
operationContext)
this.viewSchema = schema
Seq.empty
}
override def processData(session: SparkSession): Seq[Row] = {
if (this.viewSchema != null && !deferredRefresh) {
MVRefresher.refresh(this.viewSchema, session)
val viewManager = MVManagerInSpark.get(session)
viewManager.setStatus(this.viewSchema.getIdentifier, MVStatus.ENABLED)
}
Seq.empty
}
override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
if (this.schema != null) {
CarbonDropMVCommand(databaseNameOption, name, ifExistsSet = true).run(
sparkSession)
}
Seq.empty
}
override protected def opName: String = "CREATE MATERIALIZED VIEW"
private def doCreate(session: SparkSession,
tableIdentifier: TableIdentifier,
viewManager: MVManagerInSpark,
viewCatalog: MVCatalogInSpark): MVSchema = {
val logicalPlan = MVHelper.dropDummyFunction(
MVQueryParser.getQueryPlan(queryString, session))
// check if mv with same query already exists
val mvSchemaWrapper = viewCatalog.getMVWithSameQueryPresent(logicalPlan)
if (mvSchemaWrapper.nonEmpty) {
val mvWithSameQuery = mvSchemaWrapper.get.viewSchema.getIdentifier.getTableName
throw new MalformedMVCommandException(
s"MV with the name `$mvWithSameQuery` has been already created with the same query")
}
val modularPlan = checkQuery(logicalPlan)
val viewSchema = getOutputSchema(logicalPlan)
val relatedTables = getRelatedTables(logicalPlan)
val relatedTableList = toCarbonTables(session, relatedTables)
val inputCols = logicalPlan.output.map(x =>
x.name
).toList
val relatedTableNames = new util.ArrayList[String](relatedTableList.size())
// Check if load is in progress in any of the parent table mapped to the indexSchema
relatedTableList.asScala.foreach {
table =>
val tableProperties = table.getTableInfo.getFactTable.getTableProperties.asScala
// validate for spatial index column
val spatialProperty = tableProperties.get(CarbonCommonConstants.SPATIAL_INDEX)
if (spatialProperty.isDefined) {
val spatialColumn = spatialProperty.get.trim
if (inputCols.contains(spatialColumn)) {
val errorMessage =
s"$spatialColumn is a spatial index column and is not allowed for " +
s"the option(s): MATERIALIZED VIEW"
throw new MalformedCarbonCommandException(errorMessage)
}
}
if (!table.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException(
"Cannot create mv on non-transactional table")
}
if (table.isMV) {
throw new MalformedCarbonCommandException(
"Cannot create mv on mv table " + table.getTableUniqueName)
}
if (table.isStreamingSink) {
throw new MalformedCarbonCommandException(
"Cannot create mv on stream table " + table.getTableUniqueName)
}
if (SegmentStatusManager.isLoadInProgressInTable(table)) {
throw new UnsupportedOperationException(
"Cannot create mv when insert is in progress on table " + table.getTableUniqueName)
}
relatedTableNames.add(table.getTableName)
}
val viewRefreshMode = if (checkIsQueryNeedFullRefresh(logicalPlan) ||
checkIsHasNonCarbonTable(relatedTables)) {
MVProperty.REFRESH_MODE_FULL
} else {
MVProperty.REFRESH_MODE_INCREMENTAL
}
val viewRefreshTriggerMode = if (deferredRefresh) {
MVProperty.REFRESH_TRIGGER_MODE_ON_MANUAL
} else {
properties.getOrElse(MVProperty.REFRESH_TRIGGER_MODE,
MVProperty.REFRESH_TRIGGER_MODE_ON_COMMIT)
}
val viewProperties = mutable.Map[String, String]()
viewProperties.put(
CarbonCommonConstants.MV_RELATED_TABLES, relatedTableNames.asScala.mkString(","))
val (timeSeriesColumn, granularity) = checkTimeSeriesQuery(logicalPlan, viewRefreshTriggerMode)
val fieldsMap = MVHelper.getFieldsMapFromPlan(
new SQLBuilder(modularPlan).SQLizer.execute(modularPlan), getLogicalRelation(logicalPlan))
// If MV is mapped to single main table, then inherit table properties from main table,
// else, will use default table properties. If DMProperties contains table properties, then
// table properties of indexSchema table will be updated
if (relatedTableList.size() == 1 && CarbonSource.isCarbonDataSource(relatedTables.head)) {
inheritTablePropertiesFromRelatedTable(
relatedTableList.get(0),
fieldsMap,
viewSchema,
viewProperties)
if (granularity != null) {
val timeSeriesDataType = relatedTableList.get(0).getTableInfo.getFactTable
.getListOfColumns.asScala
.filter(column => column.getColumnName.equalsIgnoreCase(timeSeriesColumn))
.head.getDataType
if (timeSeriesDataType.equals(DataTypes.DATE) ||
timeSeriesDataType.equals(DataTypes.TIMESTAMP)) {
// if data type is of Date type, then check if given granularity is valid for date type
if (timeSeriesDataType.equals(DataTypes.DATE)) {
checkTimeSeriesGranularityForDate(granularity)
}
} else {
throw new MalformedCarbonCommandException(
"TimeSeries Column must be of TimeStamp or Date type")
}
}
}
properties.foreach(t => viewProperties.put(t._1, t._2))
// TODO mv table support partition
// Inherit partition from related table if mv is mapped to single related table
val viewPartitionerFields = if (relatedTableList.size() == 1) {
val relatedTablePartitionColumns =
if (properties.getOrElse("partitioning", "true").toBoolean &&
relatedTableList.get(0).isHivePartitionTable) {
relatedTableList.get(0).getPartitionInfo
.getColumnSchemaList.asScala.map(_.getColumnName)
} else {
Seq.empty
}
getViewPartitionerFields(relatedTablePartitionColumns, fieldsMap)
} else {
Seq.empty
}
val columnOrderMap = new java.util.HashMap[Integer, String]()
if (viewPartitionerFields.nonEmpty) {
viewSchema.zipWithIndex.foreach {
case (viewField, index) =>
columnOrderMap.put(index, viewField.column)
}
}
// prepare table model of the collected tokens
val viewTableModel: TableModel = CarbonParserUtil.prepareTableModel(
ifNotExistPresent = ifNotExistsSet,
CarbonParserUtil.convertDbNameToLowerCase(tableIdentifier.database),
tableIdentifier.table.toLowerCase,
viewSchema,
viewPartitionerFields,
viewProperties,
None,
isAlterFlow = false,
None)
val viewTablePath = if (viewProperties.contains("path")) {
viewProperties("path")
} else {
CarbonEnv.getTablePath(viewTableModel.databaseNameOp, viewTableModel.tableName)(session)
}
CarbonCreateTableCommand(TableNewProcessor(viewTableModel),
viewTableModel.ifNotExistsSet, Some(viewTablePath), isVisible = false).run(session)
// Build and create mv schema
// Map list of main table columns mapped to MV table and add to indexSchema
val relatedTableToColumnsMap = new java.util.HashMap[String, util.Set[String]]()
for (viewField <- fieldsMap.values) {
viewField.relatedFieldList.foreach {
relatedField =>
if (null == relatedTableToColumnsMap.get(relatedField.tableName)) {
val columns = new util.HashSet[String]()
columns.add(relatedField.fieldName.toLowerCase())
relatedTableToColumnsMap.put(relatedField.tableName, columns)
} else {
relatedTableToColumnsMap.get(relatedField.tableName)
.add(relatedField.fieldName.toLowerCase())
}
}
}
val relatedTableIds = relatedTables.map { table =>
val relatedTableId = new RelationIdentifier(table.database, table.identifier.table, "")
relatedTableId.setTablePath(FileFactory.getUpdatedFilePath(table.location.toString))
relatedTableId.setProvider(table.provider.get)
relatedTableId
}
val viewIdentifier = new RelationIdentifier(
tableIdentifier.database.get, tableIdentifier.table,
CarbonEnv.getCarbonTable(tableIdentifier)(session).getTableId)
viewIdentifier.setTablePath(viewTablePath)
val schema = new MVSchema(viewManager)
schema.setIdentifier(viewIdentifier)
schema.setProperties(mutable.Map[String, String](viewProperties.toSeq: _*).asJava)
schema.setRelatedTableColumnList(relatedTableToColumnsMap)
schema.setColumnsOrderMap(columnOrderMap)
schema.setRelatedTables(new util.ArrayList[RelationIdentifier](relatedTableIds.asJava))
schema.getProperties.put(MVProperty.REFRESH_MODE, viewRefreshMode)
schema.getProperties.put(MVProperty.REFRESH_TRIGGER_MODE, viewRefreshTriggerMode)
if (null != granularity && null != timeSeriesColumn) {
schema.setQuery(queryString)
schema.setTimeSeries(true)
} else {
schema.setQuery(modularPlan.asCompactSQL)
}
try {
viewManager.createSchema(schema.getIdentifier.getDatabaseName, schema)
} catch {
case exception: Exception =>
val dropTableCommand = CarbonDropTableCommand(
ifExistsSet = true,
Option(schema.getIdentifier.getDatabaseName),
schema.getIdentifier.getTableName,
dropChildTable = true,
isInternalCall = true)
dropTableCommand.run(session)
throw exception
}
schema
}
private def toCarbonTables(session: SparkSession,
catalogTables: Seq[CatalogTable]): util.List[CarbonTable] = {
val tableList = new util.ArrayList[CarbonTable](catalogTables.size)
catalogTables.foreach { catalogTable =>
val table = CarbonEnv.getAnyTable(
catalogTable.identifier.database, catalogTable.identifier.table)(session)
tableList.add(table)
}
tableList
}
// Return all relations involved in the plan
private def getRelatedTables(logicalPlan: LogicalPlan): Seq[CatalogTable] = {
logicalPlan.collect {
case relation: LogicalRelation => relation.catalogTable.get
case relation: HiveTableRelation => relation.tableMeta
}
}
private def getLogicalRelation(logicalPlan: LogicalPlan): Seq[LogicalRelation] = {
logicalPlan.collect {
case l: LogicalRelation => l
}
}
private def getOutputSchema(logicalPlan: LogicalPlan): Seq[Field] = {
var attributeIndex = 0
logicalPlan.output.map { attribute =>
if (attribute.dataType.isInstanceOf[ArrayType] ||
attribute.dataType.isInstanceOf[StructType] ||
attribute.dataType.isInstanceOf[MapType]) {
throw new UnsupportedOperationException(
s"Materialized view is not supported for complex datatype columns and " +
s"complex datatype return types of function :" + attribute.name)
}
val attributeName = MVHelper.getUpdatedColumnName(attribute, attributeIndex)
attributeIndex += 1
val rawSchema = '`' + attributeName + '`' + ' ' + attribute.dataType.typeName
if (attribute.dataType.typeName.startsWith("decimal")) {
val (precision, scale) = CommonUtil.getScaleAndPrecision(attribute.dataType.catalogString)
Field(column = attributeName,
dataType = Some(attribute.dataType.typeName),
name = Some(attributeName),
children = None,
precision = precision,
scale = scale,
rawSchema = rawSchema)
} else {
Field(column = attributeName,
dataType = Some(attribute.dataType.typeName),
name = Some(attributeName),
children = None,
rawSchema = rawSchema)
}
}.distinct
}
private def getViewColumns(
relatedTableColumns: Array[String],
fieldsMap: scala.collection.mutable.LinkedHashMap[Field, MVField],
viewSchema: Seq[Field]) = {
val viewColumns = relatedTableColumns.flatMap(
relatedTableColumn =>
viewSchema.collect {
case viewField
if fieldsMap(viewField).aggregateFunction.isEmpty &&
fieldsMap(viewField).relatedFieldList.size == 1 &&
fieldsMap(viewField).relatedFieldList.head.fieldName
.equalsIgnoreCase(relatedTableColumn) =>
viewField.column
})
viewColumns
}
/**
* Used to extract PartitionerFields for MV.
* This method will keep generating partitionerFields until the sequence of
* partition column is broken.
*
* For example: if x,y,z are partition columns in main table then child tables will be
* partitioned only if the child table has List("x,y,z", "x,y", "x") as the projection columns.
*
*/
private def getViewPartitionerFields(relatedTablePartitionColumns: Seq[String],
fieldsMap: mutable.LinkedHashMap[Field, MVField]): Seq[PartitionerField] = {
@scala.annotation.tailrec
def generatePartitionerField(partitionColumn: List[String],
partitionerFields: Seq[PartitionerField]): Seq[PartitionerField] = {
partitionColumn match {
case head :: tail =>
// Collect the first relation which matched the condition
val validField = fieldsMap.zipWithIndex.collectFirst {
case ((field, viewField), _) if
viewField.relatedFieldList.nonEmpty &&
head.equals(viewField.relatedFieldList.head.fieldName) &&
viewField.aggregateFunction.isEmpty =>
(PartitionerField(field.name.get,
field.dataType,
field.columnComment), relatedTablePartitionColumns.indexOf(head))
}
if (validField.isDefined) {
val (partitionerField, index) = validField.get
// if relation is found then check if the partitionerFields already found are equal
// to the index of this element.
// If x with index 1 is found then there should be exactly 1 element already found.
// If z with index 2 comes directly after x then this check will be false are 1
// element is skipped in between and index would be 2 and number of elements found
// would be 1. In that case return empty sequence so that the aggregate table is not
// partitioned on any column.
if (index == partitionerFields.length) {
generatePartitionerField(tail, partitionerFields :+ partitionerField)
} else {
Seq.empty
}
} else {
// if not found then continue search for the rest of the elements. Because the rest
// of the elements can also decide if the table has to be partitioned or not.
generatePartitionerField(tail, partitionerFields)
}
case Nil =>
// if end of list then return fields.
partitionerFields
}
}
generatePartitionerField(relatedTablePartitionColumns.toList, Seq.empty)
}
private def checkQuery(logicalPlan: LogicalPlan): ModularPlan = {
// if there is limit in query string, throw exception, as its not a valid use case
logicalPlan match {
case Limit(_, _) =>
throw new MalformedCarbonCommandException("Materialized view does not support the query " +
"with limit")
case _ =>
}
// Order by columns needs to be present in projection list for creating mv. This is because,
// we have to perform order by on all segments during query, which requires the order by column
// data
logicalPlan.transform {
case sort@Sort(order, _, _) =>
order.map { orderByCol =>
orderByCol.child match {
case attr: AttributeReference =>
if (!logicalPlan.output.contains(attr.toAttribute)) {
throw new UnsupportedOperationException(
"Order by column `" + attr.name + "` must be present in project columns")
}
}
order
}
sort
}
val modularPlan =
SimpleModularizer.modularize(BirdcageOptimizer.execute(logicalPlan)).next().semiHarmonized
// Only queries which can be select , predicate , join, group by and having queries.
if (!modularPlan.isSPJGH) {
throw new UnsupportedOperationException("MV is not supported for this query")
}
val isValid = modularPlan match {
case groupBy: GroupBy =>
// Make sure all predicates are present in projections.
groupBy.predicateList.forall {
predicate =>
groupBy.outputList.exists {
case alias: Alias =>
alias.semanticEquals(predicate) || alias.child.semanticEquals(predicate)
case other => other.semanticEquals(predicate)
}
}
case _ => true
}
if (!isValid) {
throw new UnsupportedOperationException(
"Group by columns must be present in project columns")
}
var expressionValid = true
modularPlan.transformExpressions {
case coalesce@Coalesce(_) if coalesce.children.exists(
expression => expression.isInstanceOf[AggregateExpression]) =>
expressionValid = false
coalesce
}
if (!expressionValid) {
throw new UnsupportedOperationException("MV doesn't support Coalesce")
}
modularPlan
}
private def checkProperties(properties: mutable.Map[String, String]): Unit = {
// TODO check with white list will be better.
if (properties.contains("streaming") && properties("streaming").equalsIgnoreCase("true")) {
throw new MalformedCarbonCommandException(
s"Materialized view does not support streaming")
}
val unsupportedPropertyKeys = Array(
"sort_columns",
"local_dictionary_include",
"local_dictionary_exclude",
"long_string_columns",
"no_inverted_index",
"inverted_index",
"column_meta_cache",
"range_column")
val unsupportedProperties = properties.filter(
property => unsupportedPropertyKeys.exists(key => key.equalsIgnoreCase(property._1)))
if (unsupportedProperties.nonEmpty) {
throw new MalformedMVCommandException(
"Properties " + unsupportedProperties.keySet.mkString(",") +
" are not allowed for this materialized view")
}
}
/**
* Return true if we can do incremental load on the mv table based on the query plan.
* Some cases like aggregation functions which are present inside other expressions
* like sum(a)+sum(b) cannot be incremental loaded.
*/
private def checkIsQueryNeedFullRefresh(logicalPlan: LogicalPlan): Boolean = {
var needFullRefresh = false
logicalPlan.transformAllExpressions {
case alias: Alias => alias
case aggregate: AggregateExpression =>
// If average function present then go for full refresh
val reload = aggregate.aggregateFunction match {
case _: Average => true
case _ => false
}
needFullRefresh = reload || needFullRefresh
aggregate
case cast: Cast =>
needFullRefresh = cast.child.find {
case _: AggregateExpression => false
case _ => false
}.isDefined || needFullRefresh
cast
case expression: Expression =>
// Check any aggregation function present inside other expression.
needFullRefresh = expression.find {
case _: AggregateExpression => true
case _ => false
}.isDefined || needFullRefresh
expression
}
// TODO:- Remove this case when incremental data loading is supported for multiple tables
logicalPlan.transformDown {
case join@Join(_, _, _, _) =>
needFullRefresh = true
join
}
needFullRefresh
}
/**
* Return true if we can do incremental load on the mv table based on data source
*
* @return
*/
private def checkIsHasNonCarbonTable(mainTables: Seq[CatalogTable]): Boolean = {
mainTables.exists(table => !CarbonSource.isCarbonDataSource(table))
}
/**
* Validate mv timeseries query for timeseries column and granularity.
* TimeSeries udf function will contain data type as TimeStamp/cast as TimeStamp
*
* @param logicalPlan to be validated
* @param viewRefreshTriggerModel to check if it is lazy/non-lazy mv
* @return
*/
private def checkTimeSeriesQuery(logicalPlan: LogicalPlan,
viewRefreshTriggerModel: String): (String, String) = {
var timeSeriesColumn: String = null
var granularity: String = null
logicalPlan.transformExpressions {
case alias@Alias(function: ScalaUDF, _) =>
if (function.function.isInstanceOf[TimeSeriesFunction]) {
if (null == timeSeriesColumn && null == granularity) {
function.children.collect {
case reference: AttributeReference =>
timeSeriesColumn = reference.name
case literal: Literal =>
granularity = literal.value.toString
case cast: Cast =>
cast.child match {
case reference: AttributeReference =>
if (reference.dataType.isInstanceOf[DateType]) {
timeSeriesColumn = reference.name
}
case _ =>
}
}
} else {
function.children.collect {
case reference: AttributeReference =>
if (!reference.name.equalsIgnoreCase(timeSeriesColumn)) {
throw new MalformedCarbonCommandException(
"Multiple timeseries udf functions are defined in Select statement with " +
"different timestamp columns")
}
case literal: Literal =>
if (!granularity.equalsIgnoreCase(literal.value.toString)) {
throw new MalformedCarbonCommandException(
"Multiple timeseries udf functions are defined in Select statement with " +
"different granularities")
}
}
}
}
alias
}
// timeseries column and granularity is not null, then validate
if (null != timeSeriesColumn && null != granularity) {
if (viewRefreshTriggerModel.equalsIgnoreCase(
MVProperty.REFRESH_TRIGGER_MODE_ON_MANUAL)) {
throw new MalformedCarbonCommandException(
"MV TimeSeries queries does not support refresh on manual")
}
checkTimeSeriesGranularity(granularity)
} else if (null == timeSeriesColumn && null != granularity) {
throw new MalformedCarbonCommandException(
"MV TimeSeries is only supported on Timestamp/Date column")
}
(timeSeriesColumn, granularity)
}
/**
* validate TimeSeries Granularity
*
* @param timeSeriesFunction user defined granularity
*/
private def checkTimeSeriesGranularity(timeSeriesFunction: String): Unit = {
var found = false
breakable {
for (granularity <- MVTimeGranularity.getAll()) {
if (timeSeriesFunction.equalsIgnoreCase(granularity.name)) {
found = true
break
}
}
}
if (!found) {
throw new MalformedCarbonCommandException("Granularity " + timeSeriesFunction + " is invalid")
}
}
private def checkTimeSeriesGranularityForDate(timeSeriesFunction: String): Unit = {
for (granularity <- MVTimeGranularity.getAll()) {
if (timeSeriesFunction.equalsIgnoreCase(granularity.name)) {
if (granularity.seconds < 24 * 60 * 60) {
throw new MalformedCarbonCommandException(
"Granularity should be of DAY/WEEK/MONTH/YEAR, for timeseries column of Date type")
}
}
}
}
private def inheritTablePropertiesFromRelatedTable(
relatedTable: CarbonTable,
fieldsMap: scala.collection.mutable.LinkedHashMap[Field, MVField],
viewSchema: Seq[Field],
viewProperties: mutable.Map[String, String]): Unit = {
var viewTableOrder = Seq[String]()
val relatedTableOrder = relatedTable.getSortColumns.asScala
val relatedTableProperties =
relatedTable.getTableInfo.getFactTable.getTableProperties.asScala
relatedTableOrder.foreach(
relatedTableField =>
viewSchema.filter(
viewField =>
fieldsMap(viewField).aggregateFunction.isEmpty &&
fieldsMap(viewField).relatedFieldList.size == 1 &&
fieldsMap(viewField).relatedFieldList.head.fieldName.equalsIgnoreCase(
relatedTableField))
.map(viewField => viewTableOrder :+= viewField.column))
if (viewTableOrder.nonEmpty) {
viewProperties.put(CarbonCommonConstants.SORT_COLUMNS, viewTableOrder.mkString(","))
}
val sortScope = relatedTableProperties.get("sort_scope")
if (sortScope.isDefined) {
viewProperties.put("sort_scope", sortScope.get)
}
viewProperties
.put(CarbonCommonConstants.TABLE_BLOCKSIZE, relatedTable.getBlockSizeInMB.toString)
viewProperties.put(CarbonCommonConstants.FLAT_FOLDER,
relatedTableProperties.getOrElse(
CarbonCommonConstants.FLAT_FOLDER, CarbonCommonConstants.DEFAULT_FLAT_FOLDER))
// MV table name and columns are automatically added prefix with parent table name
// in carbon. For convenient, users can type column names same as the ones in select statement
// when config properties, and here we update column names with prefix.
// If longStringColumn is not present in dm properties then we take long_string_columns from
// the parent table.
var longStringColumn = viewProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS)
if (longStringColumn.isEmpty) {
val longStringColumnInRelatedTables = relatedTableProperties
.getOrElse(CarbonCommonConstants.LONG_STRING_COLUMNS, "").split(",").map(_.trim)
val varcharColumnInRelatedTables = scala.collection.mutable.ArrayBuffer.empty[String]
fieldsMap foreach (fields => {
val aggregateFunction = fields._2.aggregateFunction
val relatedFieldList = fields._2.relatedFieldList
// check if columns present in mv are long_string_col in parent table. If they are
// long_string_columns in parent, make them long_string_columns in mv
if (aggregateFunction.isEmpty &&
relatedFieldList.size == 1 &&
longStringColumnInRelatedTables.contains(relatedFieldList.head.fieldName)) {
varcharColumnInRelatedTables += relatedFieldList.head.fieldName
}
})
if (varcharColumnInRelatedTables.nonEmpty) {
longStringColumn = Option(varcharColumnInRelatedTables.mkString(","))
}
}
if (longStringColumn.isDefined) {
val fieldNames = viewSchema.map(_.column)
val newLongStringColumn = longStringColumn.get.split(",").map(_.trim).map {
columnName =>
val newColumnName = relatedTable.getTableName.toLowerCase() + "_" + columnName
if (!fieldNames.contains(newColumnName)) {
throw new MalformedCarbonCommandException(
CarbonCommonConstants.LONG_STRING_COLUMNS.toUpperCase() + ":" + columnName
+ " does not in mv")
}
newColumnName
}
viewProperties.put(CarbonCommonConstants.LONG_STRING_COLUMNS,
newLongStringColumn.mkString(","))
}
// inherit compressor property
viewProperties
.put(CarbonCommonConstants.COMPRESSOR,
relatedTable.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.COMPRESSOR,
CompressorFactory.getInstance().getCompressor.getName))
// inherit the local dictionary properties of main parent table
viewProperties
.put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
relatedTable.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, "false"))
viewProperties
.put(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
relatedTable.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT))
val parentDictInclude = relatedTable.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, "").split(",")
val parentDictExclude = relatedTable.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, "").split(",")
val newLocalDictInclude = getViewColumns(parentDictInclude, fieldsMap, viewSchema)
val newLocalDictExclude = getViewColumns(parentDictExclude, fieldsMap, viewSchema)
if (newLocalDictInclude.nonEmpty) {
viewProperties
.put(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, newLocalDictInclude.mkString(","))
}
if (newLocalDictExclude.nonEmpty) {
viewProperties
.put(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, newLocalDictExclude.mkString(","))
}
val parentInvertedIndex = relatedTable.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.INVERTED_INDEX, "").split(",")
val newInvertedIndex = getViewColumns(parentInvertedIndex, fieldsMap, viewSchema)
val parentNoInvertedIndex = relatedTable.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.NO_INVERTED_INDEX, "").split(",")
val newNoInvertedIndex =
getViewColumns(parentNoInvertedIndex, fieldsMap, viewSchema)
if (newInvertedIndex.nonEmpty) {
viewProperties.put(CarbonCommonConstants.INVERTED_INDEX, newInvertedIndex.mkString(","))
}
if (newNoInvertedIndex.nonEmpty) {
viewProperties.put(CarbonCommonConstants.NO_INVERTED_INDEX, newNoInvertedIndex.mkString(","))
}
}
}
object CarbonCreateMVCommand {
private val LOGGER: Logger = LogServiceFactory.getLogService(
classOf[CarbonCreateMVCommand].getCanonicalName)
}