blob: b32367b067c2be742ff369a56bec56dec0b792d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.mv.datamap
import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Coalesce, Expression, Literal, NamedExpression, ScalaUDF, SortOrder}
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, Limit, LogicalPlan, Project}
import org.apache.spark.sql.execution.command.{Field, PartitionerField, TableModel, TableNewProcessor}
import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, CarbonDropTableCommand}
import org.apache.spark.sql.execution.command.timeseries.{TimeSeriesFunction, TimeSeriesUtil}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.types.{ArrayType, DateType, MapType, StructType}
import org.apache.spark.util.{DataMapUtil, PartitionUtils}
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, RelationIdentifier}
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.datamap.DataMapManager
import org.apache.carbondata.mv.plans.modular._
import org.apache.carbondata.mv.plans.util.SQLBuilder
import org.apache.carbondata.mv.rewrite.{MVPlanWrapper, QueryRewrite, SummaryDatasetCatalog}
import org.apache.carbondata.spark.util.CommonUtil
/**
* Utility for MV datamap operations.
*/
object MVHelper {
def createMVDataMap(sparkSession: SparkSession,
dataMapSchema: DataMapSchema,
queryString: String,
ifNotExistsSet: Boolean = false,
mainTable: CarbonTable): Unit = {
val dmProperties = dataMapSchema.getProperties.asScala
if (dmProperties.contains("streaming") && dmProperties("streaming").equalsIgnoreCase("true")) {
throw new MalformedCarbonCommandException(
s"MV datamap does not support streaming"
)
}
val mvUtil = new MVUtil
mvUtil.validateDMProperty(dmProperties)
val updatedQuery = new CarbonSpark2SqlParser().addMVSkipFunction(queryString)
val query = sparkSession.sql(updatedQuery)
val logicalPlan = MVHelper.dropDummFuc(query.queryExecution.analyzed)
// if there is limit in MV ctas query string, throw exception, as its not a valid usecase
logicalPlan match {
case Limit(_, _) =>
throw new MalformedCarbonCommandException("MV datamap does not support the query with " +
"limit")
case _ =>
}
val selectTables = getTables(logicalPlan)
if (selectTables.isEmpty) {
throw new MalformedCarbonCommandException(
s"Non-Carbon table does not support creating MV datamap")
}
val modularPlan = validateMVQuery(sparkSession, logicalPlan)
val updatedQueryWithDb = modularPlan.asCompactSQL
val (timeSeriesColumn, granularity): (String, String) = validateMVTimeSeriesQuery(
logicalPlan,
dataMapSchema)
val fullRebuild = isFullReload(logicalPlan)
var counter = 0
// the ctas query can have duplicate columns, so we should take distinct and create fields,
// so that it won't fail during create mv table
val fields = logicalPlan.output.map { attr =>
if (attr.dataType.isInstanceOf[ArrayType] || attr.dataType.isInstanceOf[StructType] ||
attr.dataType.isInstanceOf[MapType]) {
throw new UnsupportedOperationException(
s"MV datamap is not supported for complex datatype columns and complex datatype return " +
s"types of function :" + attr.name)
}
val name = updateColumnName(attr, counter)
counter += 1
val rawSchema = '`' + name + '`' + ' ' + attr.dataType.typeName
if (attr.dataType.typeName.startsWith("decimal")) {
val (precision, scale) = CommonUtil.getScaleAndPrecision(attr.dataType.catalogString)
Field(column = name,
dataType = Some(attr.dataType.typeName),
name = Some(name),
children = None,
precision = precision,
scale = scale,
rawSchema = rawSchema)
} else {
Field(column = name,
dataType = Some(attr.dataType.typeName),
name = Some(name),
children = None,
rawSchema = rawSchema)
}
}.distinct
val tableProperties = mutable.Map[String, String]()
val parentTables = new util.ArrayList[String]()
val parentTablesList = new util.ArrayList[CarbonTable](selectTables.size)
selectTables.foreach { selectTable =>
val mainCarbonTable = try {
Some(CarbonEnv.getCarbonTable(selectTable.identifier.database,
selectTable.identifier.table)(sparkSession))
} catch {
// Exception handling if it's not a CarbonTable
case ex: Exception =>
throw new MalformedCarbonCommandException(
s"Non-Carbon table does not support creating MV datamap")
}
if (!mainCarbonTable.get.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on NonTransactional table")
}
if (mainCarbonTable.get.isChildTableForMV) {
throw new MalformedCarbonCommandException(
"Cannot create Datamap on child table " + mainCarbonTable.get.getTableUniqueName)
}
parentTables.add(mainCarbonTable.get.getTableName)
if (!mainCarbonTable.isEmpty && mainCarbonTable.get.isStreamingSink) {
throw new MalformedCarbonCommandException(
s"Streaming table does not support creating MV datamap")
}
parentTablesList.add(mainCarbonTable.get)
}
// Check if load is in progress in any of the parent table mapped to the datamap
parentTablesList.asScala.foreach {
parentTable =>
if (SegmentStatusManager.isLoadInProgressInTable(parentTable)) {
throw new UnsupportedOperationException(
"Cannot create mv datamap table when insert is in progress on parent table: " +
parentTable.getTableName)
}
}
tableProperties.put(CarbonCommonConstants.DATAMAP_NAME, dataMapSchema.getDataMapName)
tableProperties.put(CarbonCommonConstants.PARENT_TABLES, parentTables.asScala.mkString(","))
val finalModularPlan = new SQLBuilder(modularPlan).SQLizer.execute(modularPlan)
val fieldRelationMap = mvUtil.getFieldsAndDataMapFieldsFromPlan(finalModularPlan,
getLogicalRelation(logicalPlan))
// If dataMap is mapped to single main table, then inherit table properties from main table,
// else, will use default table properties. If DMProperties contains table properties, then
// table properties of datamap table will be updated
if (parentTablesList.size() == 1) {
DataMapUtil
.inheritTablePropertiesFromMainTable(parentTablesList.get(0),
fields,
fieldRelationMap,
tableProperties)
if (granularity != null) {
if (null != mainTable) {
if (!mainTable.getTableName.equalsIgnoreCase(parentTablesList.get(0).getTableName)) {
throw new MalformedCarbonCommandException(
"Parent table name is different in Create and Select Statement")
}
}
val timeSeriesDataType = parentTablesList
.get(0)
.getTableInfo
.getFactTable
.getListOfColumns
.asScala
.filter(columnSchema => columnSchema.getColumnName
.equalsIgnoreCase(timeSeriesColumn))
.head
.getDataType
if (timeSeriesDataType.equals(DataTypes.DATE) ||
timeSeriesDataType.equals(DataTypes.TIMESTAMP)) {
// if data type is of Date type, then check if given granularity is valid for date type
if (timeSeriesDataType.equals(DataTypes.DATE)) {
TimeSeriesUtil.validateTimeSeriesGranularityForDate(granularity)
}
} else {
throw new MalformedCarbonCommandException(
"TimeSeries Column must be of TimeStamp or Date type")
}
}
}
dmProperties.foreach(t => tableProperties.put(t._1, t._2))
val usePartitioning = dmProperties.getOrElse("partitioning", "true").toBoolean
var partitionerFields: Seq[PartitionerField] = Seq.empty
// Inherit partition from parent table if datamap is mapped to single parent table
if (parentTablesList.size() == 1) {
val partitionInfo = parentTablesList.get(0).getPartitionInfo
val parentPartitionColumns = if (!usePartitioning) {
Seq.empty
} else if (parentTablesList.get(0).isHivePartitionTable) {
partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
} else {
Seq()
}
partitionerFields = PartitionUtils
.getPartitionerFields(parentPartitionColumns, fieldRelationMap)
}
var order = 0
val columnOrderMap = new java.util.HashMap[Integer, String]()
if (partitionerFields.nonEmpty) {
fields.foreach { field =>
columnOrderMap.put(order, field.column)
order += 1
}
}
// TODO Use a proper DB
val tableIdentifier =
TableIdentifier(dataMapSchema.getDataMapName + "_table",
selectTables.head.identifier.database)
// prepare table model of the collected tokens
val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(
ifNotExistPresent = ifNotExistsSet,
new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
tableIdentifier.table.toLowerCase,
fields,
partitionerFields,
tableProperties,
None,
isAlterFlow = false,
None)
val tablePath = if (dmProperties.contains("path")) {
dmProperties("path")
} else {
CarbonEnv.getTablePath(tableModel.databaseNameOp, tableModel.tableName)(sparkSession)
}
CarbonCreateTableCommand(TableNewProcessor(tableModel),
tableModel.ifNotExistsSet, Some(tablePath), isVisible = false).run(sparkSession)
// Map list of main table columns mapped to datamap table and add to dataMapSchema
val mainTableToColumnsMap = new java.util.HashMap[String, util.Set[String]]()
val mainTableFieldIterator = fieldRelationMap.values.asJava.iterator()
while (mainTableFieldIterator.hasNext) {
val value = mainTableFieldIterator.next()
value.columnTableRelationList.foreach {
columnTableRelation =>
columnTableRelation.foreach {
mainTable =>
if (null == mainTableToColumnsMap.get(mainTable.parentTableName)) {
val columns = new util.HashSet[String]()
columns.add(mainTable.parentColumnName.toLowerCase())
mainTableToColumnsMap.put(mainTable.parentTableName, columns)
} else {
mainTableToColumnsMap.get(mainTable.parentTableName)
.add(mainTable.parentColumnName.toLowerCase())
}
}
}
}
dataMapSchema.setMainTableColumnList(mainTableToColumnsMap)
dataMapSchema.setColumnsOrderMap(columnOrderMap)
if (null != granularity && null != timeSeriesColumn) {
dataMapSchema.setCtasQuery(queryString)
dataMapSchema.setTimeSeries(true)
} else {
dataMapSchema.setCtasQuery(updatedQueryWithDb)
}
dataMapSchema
.setRelationIdentifier(new RelationIdentifier(tableIdentifier.database.get,
tableIdentifier.table,
CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession).getTableId))
val parentIdents = selectTables.map { table =>
val relationIdentifier = new RelationIdentifier(table.database, table.identifier.table, "")
relationIdentifier.setTablePath(FileFactory.getUpdatedFilePath(table.location.toString))
relationIdentifier
}
dataMapSchema.getRelationIdentifier.setTablePath(tablePath)
dataMapSchema.setParentTables(new util.ArrayList[RelationIdentifier](parentIdents.asJava))
dataMapSchema.getProperties.put("full_refresh", fullRebuild.toString)
try {
DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema)
} catch {
case ex: Exception =>
val dropTableCommand = CarbonDropTableCommand(true,
new Some[String](dataMapSchema.getRelationIdentifier.getDatabaseName),
dataMapSchema.getRelationIdentifier.getTableName,
true)
dropTableCommand.run(sparkSession)
throw ex
}
}
private def validateMVQuery(sparkSession: SparkSession,
logicalPlan: LogicalPlan): ModularPlan = {
val dataMapProvider = DataMapManager.get().getDataMapProvider(null,
new DataMapSchema("", DataMapClassProvider.MV.getShortName), sparkSession)
var catalog = DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog]
if (catalog == null) {
catalog = new SummaryDatasetCatalog(sparkSession)
}
val modularPlan =
catalog.mvSession.sessionState.modularizer.modularize(
catalog.mvSession.sessionState.optimizer.execute(logicalPlan)).next().semiHarmonized
// Only queries which can be select , predicate , join, group by and having queries.
if (!modularPlan.isSPJGH) {
throw new UnsupportedOperationException("MV is not supported for this query")
}
val isValid = modularPlan match {
case g: GroupBy =>
// Make sure all predicates are present in projections.
g.predicateList.forall{p =>
g.outputList.exists{
case a: Alias =>
a.semanticEquals(p) || a.child.semanticEquals(p)
case other => other.semanticEquals(p)
}
}
case _ => true
}
if (!isValid) {
throw new UnsupportedOperationException(
"Group by columns must be present in project columns")
}
if (catalog.isMVWithSameQueryPresent(logicalPlan)) {
throw new UnsupportedOperationException("MV with same query present")
}
var expressionValid = true
modularPlan.transformExpressions {
case coal@Coalesce(_) if coal.children.exists(
exp => exp.isInstanceOf[AggregateExpression]) =>
expressionValid = false
coal
}
if (!expressionValid) {
throw new UnsupportedOperationException("MV doesn't support Coalesce")
}
modularPlan
}
def getUpdatedName(name: String, counter: Int): String = {
var updatedName = name.replace("(", "_")
.replace(")", "")
.replace(" ", "_")
.replace("=", "")
.replace(",", "")
.replace(".", "_")
.replace("`", "")
if (updatedName.length >= CarbonCommonConstants.MAXIMUM_CHAR_LENGTH) {
updatedName = updatedName.substring(0, 110) + CarbonCommonConstants.UNDERSCORE + counter
}
updatedName
}
def updateColumnName(attr: Attribute, counter: Int): String = {
val name = getUpdatedName(attr.name, counter)
attr.qualifier.map(qualifier => qualifier + "_" + name).getOrElse(name)
}
def getTables(logicalPlan: LogicalPlan): Seq[CatalogTable] = {
logicalPlan.collect {
case l: LogicalRelation => l.catalogTable.get
}
}
def getLogicalRelation(logicalPlan: LogicalPlan): Seq[LogicalRelation] = {
logicalPlan.collect {
case l: LogicalRelation => l
}
}
def dropDummFuc(plan: LogicalPlan): LogicalPlan = {
plan transform {
case p@Project(exps, child) =>
Project(dropDummyExp(exps), child)
case Aggregate(grp, aggExp, child) =>
Aggregate(
grp,
dropDummyExp(aggExp),
child)
}
}
private def dropDummyExp(exps: Seq[NamedExpression]) = {
exps.map {
case al@Alias(udf: ScalaUDF, name) if name.equalsIgnoreCase(CarbonEnv.MV_SKIP_RULE_UDF) =>
None
case attr: AttributeReference if attr.name.equalsIgnoreCase(CarbonEnv.MV_SKIP_RULE_UDF) =>
None
case other => Some(other)
}.filter(_.isDefined).map(_.get)
}
/**
* Check if we can do incremental load on the mv table. Some cases like aggregation functions
* which are present inside other expressions like sum(a)+sum(b) cannot be incremental loaded.
*/
private def isFullReload(logicalPlan: LogicalPlan): Boolean = {
var isFullReload = false
logicalPlan.transformAllExpressions {
case a: Alias => a
case agg: AggregateExpression =>
// If average function present then go for full refresh
var reload = agg.aggregateFunction match {
case avg: Average => true
case _ => false
}
isFullReload = reload || isFullReload
agg
case c: Cast =>
isFullReload = c.child.find {
case agg: AggregateExpression => false
case _ => false
}.isDefined || isFullReload
c
case exp: Expression =>
// Check any aggregation function present inside other expression.
isFullReload = exp.find {
case agg: AggregateExpression => true
case _ => false
}.isDefined || isFullReload
exp
}
// TODO:- Remove this case when incremental datalaoding is supported for multiple tables
logicalPlan.transformDown {
case join@Join(l1, l2, jointype, condition) =>
isFullReload = true
join
}
isFullReload
}
def getAttributeMap(subsumer: Seq[NamedExpression],
subsume: Seq[NamedExpression]): Map[AttributeKey, NamedExpression] = {
// when datamap is created with duplicate columns like select sum(age),sum(age) from table,
// the subsumee will have duplicate, so handle that case here
if (subsumer.length == subsume.groupBy(_.name).size) {
subsume.zip(subsumer).flatMap { case (left, right) =>
var tuples = left collect {
case attr: AttributeReference =>
(AttributeKey(attr), createAttrReference(right, attr.name))
}
left match {
case a: Alias =>
tuples = Seq((AttributeKey(a.child), createAttrReference(right, a.name))) ++ tuples
case _ =>
}
Seq((AttributeKey(left), createAttrReference(right, left.name))) ++ tuples
}.toMap
} else {
throw new UnsupportedOperationException("Cannot create mapping with unequal sizes")
}
}
def createAttrReference(ref: NamedExpression, name: String): Alias = {
Alias(ref, name)(exprId = ref.exprId, qualifier = None)
}
case class AttributeKey(exp: Expression) {
override def equals(other: Any): Boolean = other match {
case attrKey: AttributeKey =>
exp.semanticEquals(attrKey.exp)
case _ => false
}
// Basically we want to use it as simple linked list so hashcode is hardcoded.
override def hashCode: Int = 1
}
/**
* Updates the expressions as per the subsumer output expressions. It is needed to update the
* expressions as per the datamap table relation
*
* @param expressions expressions which are needed to update
* @param aliasName table alias name
* @return Updated expressions
*/
def updateSubsumeAttrs(
expressions: Seq[Expression],
attrMap: Map[AttributeKey, NamedExpression],
aliasName: Option[String],
keepAlias: Boolean = false): Seq[Expression] = {
def getAttribute(exp: Expression) = {
exp match {
case Alias(agg: AggregateExpression, name) =>
agg.aggregateFunction.collect {
case attr: AttributeReference =>
CarbonToSparkAdapter.createAttributeReference(attr.name,
attr.dataType,
attr.nullable,
attr.metadata,
attr.exprId,
aliasName,
attr)
}.head
case Alias(child, name) =>
child
case other => other
}
}
expressions.map {
case alias@Alias(agg: AggregateExpression, name) =>
attrMap.get(AttributeKey(agg)).map { exp =>
CarbonToSparkAdapter.createAliasRef(
getAttribute(exp),
name,
alias.exprId,
alias.qualifier,
alias.explicitMetadata,
Some(alias))
}.getOrElse(alias)
case attr: AttributeReference =>
val uattr = attrMap.get(AttributeKey(attr)).map{a =>
if (keepAlias) {
CarbonToSparkAdapter.createAttributeReference(a.name,
a.dataType,
a.nullable,
a.metadata,
a.exprId,
attr.qualifier,
a)
} else {
a
}
}.getOrElse(attr)
uattr
case alias@Alias(expression: Expression, name) =>
attrMap.get(AttributeKey(expression)).map { exp =>
CarbonToSparkAdapter
.createAliasRef(getAttribute(exp), name, alias.exprId, alias.qualifier,
alias.explicitMetadata, Some(alias))
}.getOrElse(alias)
case expression: Expression =>
val uattr = attrMap.get(AttributeKey(expression))
uattr.getOrElse(expression)
}
}
def updateOutPutList(
subsumerOutputList: Seq[NamedExpression],
dataMapRltn: Select,
aliasMap: Map[AttributeKey, NamedExpression],
keepAlias: Boolean): Seq[NamedExpression] = {
var outputSel =
updateSubsumeAttrs(
subsumerOutputList,
aliasMap,
Some(dataMapRltn.aliasMap.values.head),
keepAlias).asInstanceOf[Seq[NamedExpression]]
outputSel.zip(subsumerOutputList).map{ case (l, r) =>
l match {
case attr: AttributeReference =>
Alias(attr, r.name)(r.exprId, None)
case a@Alias(attr: AttributeReference, name) =>
Alias(attr, r.name)(r.exprId, None)
case other => other
}
}
}
def updateSelectPredicates(
predicates: Seq[Expression],
attrMap: Map[AttributeKey, NamedExpression],
keepAlias: Boolean): Seq[Expression] = {
predicates.map { exp =>
exp transform {
case attr: AttributeReference =>
val uattr = attrMap.get(AttributeKey(attr)).map{a =>
if (keepAlias) {
CarbonToSparkAdapter
.createAttributeReference(a.name,
a.dataType,
a.nullable,
a.metadata,
a.exprId,
attr.qualifier,
a)
} else {
a
}
}.getOrElse(attr)
uattr
}
}
}
/**
* Update the modular plan as per the datamap table relation inside it.
*
* @param subsumer plan to be updated
* @return Updated modular plan.
*/
def updateDataMap(subsumer: ModularPlan, rewrite: QueryRewrite): ModularPlan = {
subsumer match {
case s: Select if s.dataMapTableRelation.isDefined =>
val relation =
s.dataMapTableRelation.get.asInstanceOf[MVPlanWrapper].plan.asInstanceOf[Select]
val outputList = getUpdatedOutputList(relation.outputList, s.dataMapTableRelation)
// when the output list contains multiple projection of same column, but relation
// contains distinct columns, mapping may go wrong with columns, so select distinct
val mappings = s.outputList.distinct zip outputList
val oList = for ((o1, o2) <- mappings) yield {
if (o1.name != o2.name) Alias(o2, o1.name)(exprId = o1.exprId) else o2
}
relation.copy(outputList = oList).setRewritten()
case g: GroupBy if g.dataMapTableRelation.isDefined =>
val relation =
g.dataMapTableRelation.get.asInstanceOf[MVPlanWrapper].plan.asInstanceOf[Select]
val in = relation.asInstanceOf[Select].outputList
val outputList = getUpdatedOutputList(relation.outputList, g.dataMapTableRelation)
val mappings = g.outputList zip outputList
val oList = for ((left, right) <- mappings) yield {
left match {
case Alias(agg@AggregateExpression(fun@Sum(child), _, _, _), name) =>
val uFun = fun.copy(child = right)
Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
case Alias(agg@AggregateExpression(fun@Max(child), _, _, _), name) =>
val uFun = fun.copy(child = right)
Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
case Alias(agg@AggregateExpression(fun@Min(child), _, _, _), name) =>
val uFun = fun.copy(child = right)
Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
case Alias(agg@AggregateExpression(fun@Count(Seq(child)), _, _, _), name) =>
val uFun = Sum(right)
Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
case Alias(agg@AggregateExpression(fun@Corr(l, r), _, _, _), name) =>
val uFun = Sum(right)
Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
case Alias(agg@AggregateExpression(fun@VariancePop(child), _, _, _), name) =>
val uFun = Sum(right)
Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
case Alias(agg@AggregateExpression(fun@VarianceSamp(child), _, _, _), name) =>
val uFun = Sum(right)
Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
case Alias(agg@AggregateExpression(fun@StddevSamp(child), _, _, _), name) =>
val uFun = Sum(right)
Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
case Alias(agg@AggregateExpression(fun@StddevPop(child), _, _, _), name) =>
val uFun = Sum(right)
Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
case Alias(agg@AggregateExpression(fun@CovPopulation(l, r), _, _, _), name) =>
val uFun = Sum(right)
Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
case Alias(agg@AggregateExpression(fun@CovSample(l, r), _, _, _), name) =>
val uFun = Sum(right)
Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
case Alias(agg@AggregateExpression(fun@Skewness(child), _, _, _), name) =>
val uFun = Sum(right)
Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
case Alias(agg@AggregateExpression(fun@Kurtosis(child), _, _, _), name) =>
val uFun = Sum(right)
Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
case _ =>
if (left.name != right.name) Alias(right, left.name)(exprId = left.exprId) else right
}
}
val updatedPredicates = g.predicateList.map { f =>
mappings.find{ case (k, y) =>
k match {
case a: Alias if f.isInstanceOf[Alias] =>
a.child.semanticEquals(f.children.head)
case a: Alias => a.child.semanticEquals(f)
case other => other.semanticEquals(f)
}
} match {
case Some(r) => r._2
case _ => f
}
}
g.copy(outputList = oList,
inputList = in,
predicateList = updatedPredicates,
child = relation,
dataMapTableRelation = None).setRewritten()
case select: Select =>
select.children match {
case Seq(g: GroupBy) if g.dataMapTableRelation.isDefined =>
val relation =
g.dataMapTableRelation.get.asInstanceOf[MVPlanWrapper].plan.asInstanceOf[Select]
val aliasMap = getAttributeMap(relation.outputList, g.outputList)
// Update the flagspec as per the mv table attributes.
val updatedFlagSpec: Seq[Seq[Any]] = updateFlagSpec(
keepAlias = false,
select,
relation,
aliasMap)
if (isFullRefresh(g.dataMapTableRelation.get.asInstanceOf[MVPlanWrapper])) {
val outputList = getUpdatedOutputList(relation.outputList, g.dataMapTableRelation)
val mappings = g.outputList zip outputList
val oList = for ((o1, o2) <- mappings) yield {
if (o1.name != o2.name) Alias(o2, o1.name)(exprId = o1.exprId) else o2
}
val outList = select.outputList.map{ f =>
oList.find(_.name.equals(f.name)).get
}
// Directly keep the relation as child.
select.copy(
outputList = outList,
children = Seq(relation),
aliasMap = relation.aliasMap,
flagSpec = updatedFlagSpec).setRewritten()
} else {
// First find the indices from the child outlist.
val indices = select.outputList.map{c =>
g.outputList.indexWhere{
case al : Alias if c.isInstanceOf[Alias] =>
al.child.semanticEquals(c.asInstanceOf[Alias].child)
case al: Alias if al.child.semanticEquals(c) => true
case other if c.isInstanceOf[Alias] =>
other.semanticEquals(c.asInstanceOf[Alias].child)
case other =>
other.semanticEquals(c) || other.toAttribute.semanticEquals(c)
}
}
val child = updateDataMap(g, rewrite).asInstanceOf[Matchable]
// Get the outList from converted child outList using already selected indices
val outputSel =
indices.map(child.outputList(_)).zip(select.outputList).map { case (l, r) =>
l match {
case a: Alias if r.isInstanceOf[Alias] =>
Alias(a.child, r.name)(exprId = r.exprId)
case a: Alias => a
case other if r.isInstanceOf[Alias] =>
Alias(other, r.name)(exprId = r.exprId)
case other => other
}
}
// TODO Remove the unnecessary columns from selection.
// Only keep columns which are required by parent.
val inputSel = child.outputList
select.copy(
outputList = outputSel,
inputList = inputSel,
flagSpec = updatedFlagSpec,
children = Seq(child)).setRewritten()
}
case _ => select
}
case other => other
}
}
/**
* Updates the flagspec of given select plan with attributes of relation select plan
*/
private def updateFlagSpec(keepAlias: Boolean,
select: Select,
relation: Select,
aliasMap: Map[AttributeKey, NamedExpression]): Seq[Seq[Any]] = {
val updatedFlagSpec = select.flagSpec.map { f =>
f.map {
case list: ArrayBuffer[_] =>
list.map { case s: SortOrder =>
val expressions =
updateOutPutList(
Seq(s.child.asInstanceOf[Attribute]),
relation,
aliasMap,
keepAlias = false)
SortOrder(expressions.head, s.direction)
}
// In case of limit it goes to other.
case other => other
}
}
updatedFlagSpec
}
/**
* It checks whether full referesh for the table is required. It means we no need to apply
* aggregation function or group by functions on the mv table.
*/
private def isFullRefresh(mvPlanWrapper: MVPlanWrapper): Boolean = {
val fullRefesh = mvPlanWrapper.dataMapSchema.getProperties.get("full_refresh")
if (fullRefesh != null) {
fullRefesh.toBoolean
} else {
false
}
}
// Create the aliases using two plan outputs mappings.
def createAliases(mappings: Seq[(NamedExpression, NamedExpression)]): Seq[NamedExpression] = {
mappings.map{ case (o1, o2) =>
o2 match {
case al: Alias if o1.name == o2.name && o1.exprId != o2.exprId =>
Alias(al.child, o1.name)(exprId = o1.exprId)
case other =>
if (o1.name != o2.name || o1.exprId != o2.exprId) {
Alias(o2, o1.name)(exprId = o1.exprId)
} else {
o2
}
}
}
}
/**
* Rewrite the updated mv query with corresponding MV table.
*/
def rewriteWithMVTable(rewrittenPlan: ModularPlan, rewrite: QueryRewrite): ModularPlan = {
if (rewrittenPlan.find(_.rewritten).isDefined) {
val updatedDataMapTablePlan = rewrittenPlan transform {
case s: Select =>
MVHelper.updateDataMap(s, rewrite)
case g: GroupBy =>
MVHelper.updateDataMap(g, rewrite)
}
updatedDataMapTablePlan
} else {
rewrittenPlan
}
}
private def getUpdatedOutputList(outputList: Seq[NamedExpression],
dataMapTableRelation: Option[ModularPlan]): Seq[NamedExpression] = {
dataMapTableRelation.collect {
case mv: MVPlanWrapper =>
val dataMapSchema = mv.dataMapSchema
val columnsOrderMap = dataMapSchema.getColumnsOrderMap
if (null != columnsOrderMap && !columnsOrderMap.isEmpty) {
val updatedOutputList = new util.ArrayList[NamedExpression]()
var i = 0
while (i < columnsOrderMap.size()) {
updatedOutputList
.add(outputList.filter(f => f.name.equalsIgnoreCase(columnsOrderMap.get(i))).head)
i = i + 1
}
updatedOutputList.asScala
} else {
outputList
}
case _ => outputList
}.get
}
/**
* Validate mv timeseries query for timeseries column and granularity.
* TimeSeries udf function will contain data type as TimeStamp/cast as TimeStamp
*
* @param logicalPlan to be validated
* @param dataMapSchema to check if it is lazy/non-lazy datamap
* @return
*/
private def validateMVTimeSeriesQuery(logicalPlan: LogicalPlan,
dataMapSchema: DataMapSchema): (String, String) = {
var timeSeriesColumn: String = null
var granularity: String = null
logicalPlan.transformExpressions {
case alias@Alias(udf: ScalaUDF, _) =>
if (udf.function.isInstanceOf[TimeSeriesFunction]) {
if (null == timeSeriesColumn && null == granularity) {
udf.children.collect {
case attr: AttributeReference =>
timeSeriesColumn = attr.name
case l: Literal =>
granularity = l.value.toString
case c: Cast =>
c.child match {
case attribute: AttributeReference =>
if (attribute.dataType.isInstanceOf[DateType]) {
timeSeriesColumn = attribute.name
}
case _ =>
}
}
} else {
udf.children.collect {
case attr: AttributeReference =>
if (!attr.name.equalsIgnoreCase(timeSeriesColumn)) {
throw new MalformedCarbonCommandException(
"Multiple timeseries udf functions are defined in Select statement with " +
"different timestamp columns")
}
case l: Literal =>
if (!granularity.equalsIgnoreCase(l.value.toString)) {
throw new MalformedCarbonCommandException(
"Multiple timeseries udf functions are defined in Select statement with " +
"different granularities")
}
}
}
}
alias
}
// timeseries column and granularity is not null, then validate
if (null != timeSeriesColumn && null != granularity) {
if (dataMapSchema.isLazy) {
throw new MalformedCarbonCommandException(
"MV TimeSeries queries does not support Lazy Rebuild")
}
TimeSeriesUtil.validateTimeSeriesGranularity(granularity)
} else if (null == timeSeriesColumn && null != granularity) {
throw new MalformedCarbonCommandException(
"MV TimeSeries is only supported on Timestamp/Date column")
}
(timeSeriesColumn, granularity)
}
}