blob: b616c201ce8c1ee0d6c76ade945f7fead2e2bc96 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCastExpression}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder}
import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, _}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation}
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.types._
import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal}
import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.preagg.{AggregateQueryPlan, AggregateTableSelector, QueryColumn}
import org.apache.carbondata.core.profiler.ExplainCollector
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo}
/**
* model class to store aggregate expression logical plan
* and its column schema mapping
* @param expression aggregate expression
* @param columnSchema list of column schema from table
*/
case class AggExpToColumnMappingModel(
expression: Expression,
var columnSchema: Option[Object] = None) {
override def equals(o: Any) : Boolean = o match {
case that: AggExpToColumnMappingModel =>
that.expression==this.expression
case _ => false
}
// TODO need to update the hash code generation code
override def hashCode : Int = 1
}
/**
* Class for applying Pre Aggregate rules
* Responsibility.
* 1. Check plan is valid plan for updating the parent table plan with child table
* 2. Updated the plan based on child schema
*
* Rules for Updating the plan
* 1. Grouping expression rules
* 1.1 Change the parent attribute reference for of group expression
* to child attribute reference
*
* 2. Aggregate expression rules
* 2.1 Change the parent attribute reference for of group expression to
* child attribute reference
* 2.2 Change the count AggregateExpression to Sum as count
* is already calculated so in case of aggregate table
* we need to apply sum to get the count
* 2.2 In case of average aggregate function select 2 columns from aggregate table with
* aggregation
* sum and count. Then add divide(sum(column with sum), sum(column with count)).
* Note: During aggregate table creation for average table will be created with two columns
* one for sum(column) and count(column) to support rollup
*
* 3. Filter Expression rules.
* 3.1 Updated filter expression attributes with child table attributes
* 4. Update the Parent Logical relation with child Logical relation
* 5. Order By Query rules.
* 5.1 Update project list based on updated aggregate expression
* 5.2 Update sort order attributes based on pre aggregate table
* 6. timeseries function
* 6.1 validate maintable has timeseries datamap
* 6.2 timeseries function is valid function or not
* 7. Streaming
* Examples1:
* Query:
* SELECT name, sum(Salary) as totalSalary
* FROM maintable.
* UpdatedQuery:
* SELECT name, sum(totalSalary) FROM(
* SELECT name, sum(Salary) as totalSalary
* FROM maintable
* GROUP BY name
* UNION ALL
* SELECT maintable_name,sum(maintable_salary) as totalSalary
* FROM maintable_agg
* GROUP BY maintable_name)
* GROUP BY name)
* Example2:
* Query:
* SELECT name, AVG(Salary) as avgSalary
* FROM maintable.
* UpdatedQuery:
* SELECT name, Divide(sum(sumSalary)/sum(countsalary))
* FROM(
* SELECT name, sum(Salary) as sumSalary,count(salary) countsalary
* FROM maintable
* GROUP BY name
* UNION ALL
* SELECT maintable_name,sum(maintable_salary) as sumSalary, count(maintable_salary) countsalary
* FROM maintable_agg
* GROUP BY maintable_name)
* GROUP BY name)
*
* Rules for updating plan in case of streaming table:
* In case of streaming data will be fetched from both fact and aggregate as aggregate table
* will be updated only after each hand-off, so current streamed data won't be available on
* aggregate table.
* 7.1 Add one union node to add both fact and aggregate table plan to get the data from both table
* 7.2 On top of Union Node add one Aggregate node to aggregate both table results
* 7.3 In case of average(avg(column)) special handling is required for streaming
* 7.3.1 Fact Plan will updated to return sum(column) and count(column) to do rollup
* 7.3.2 Aggregate Plan will updated to return sum(column) and count(column) to do rollup
* 7.4 In newly added Aggregate node all the aggregate expression must have same expression id as
* fact and fact plan will updated with new expression id. As query like order by this can be
* referred. In example1 sum(totalSalary) as totalSalary will have same expression id
* as in fact and fact plan sum(salary) will be updated with new expression id
*
* @param sparkSession spark session
*/
case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule[LogicalPlan] {
/**
* map for keeping parent attribute reference to child attribute reference
* this will be used to updated the plan in case of join or order by
*/
val updatedExpression = mutable.HashMap[AttributeReference, AttributeReference]()
/**
* parser
*/
lazy val parser = new CarbonSpark2SqlParser
/**
* Below method will be used to validate the logical plan
* @param logicalPlan query logical plan
* @return isvalid or not
*/
private def isValidPlan(logicalPlan: LogicalPlan) : Boolean = {
var isValidPlan = true
logicalPlan.transform {
case aggregate@Aggregate(grp, aExp, child) =>
isValidPlan = !aExp.exists { p =>
if (p.isInstanceOf[UnresolvedAlias]) return false
p.name.equals("preAggLoad") || p.name.equals("preAgg")
}
val updatedAggExp = aExp.filterNot(_.name.equalsIgnoreCase("preAggLoad"))
Aggregate(grp, updatedAggExp, child)
}
isValidPlan
}
override def apply(plan: LogicalPlan): LogicalPlan = {
var needAnalysis = true
plan.transformExpressions {
// first check if any preAgg scala function is applied it is present is in plan
// then call is from create preaggregate table class so no need to transform the query plan
case al@Alias(udf: ScalaUDF, name) if name.equalsIgnoreCase("preAgg") =>
needAnalysis = false
al
case al@Alias(udf: ScalaUDF, name) if name.equalsIgnoreCase("preAggLoad") =>
needAnalysis = false
al
// in case of query if any unresolve alias is present then wait for plan to be resolved
// return the same plan as we can tranform the plan only when everything is resolved
case unresolveAlias@UnresolvedAlias(_, _) =>
needAnalysis = false
unresolveAlias
case attr@UnresolvedAttribute(_) =>
needAnalysis = false
attr
}
if(needAnalysis) {
needAnalysis = isValidPlan(plan)
if(needAnalysis) {
needAnalysis = validateStreamingTablePlan(plan)
}
}
// if plan is not valid for transformation then return same plan
if (!needAnalysis) {
plan
} else {
val updatedPlan = transformPreAggQueryPlan(plan)
val newPlan = updatePlan(updatedPlan)
newPlan
}
}
/**
* Below method will be used validate whether plan is already updated in case of streaming table
* In case of streaming table it will add UnionNode to get the data from fact and aggregate both
* as aggregate table will be updated after each handoff.
* So if plan is already updated no need to transform the plan again
* @param logicalPlan
* query plan
* @return whether need to update the query plan or not
*/
def validateStreamingTablePlan(logicalPlan: LogicalPlan) : Boolean = {
var needTransformation: Boolean = true
logicalPlan.transform {
case union @ Union(Seq(plan1, plan2)) =>
plan2.collect{
case logicalRelation: LogicalRelation if
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
.isChildDataMap =>
needTransformation = false
}
union
}
needTransformation
}
/**
* Below method will be used to update the child plan
* This will be used for updating expression like join condition,
* order by, project list etc
* @param plan child plan
* @return updated plan
*/
def updatePlan(plan: LogicalPlan) : LogicalPlan = {
val updatedPlan = plan transform {
case Aggregate(grp, aggExp, child) =>
Aggregate(
updateExpression(grp),
updateExpression(aggExp.asInstanceOf[Seq[Expression]]).asInstanceOf[Seq[NamedExpression]],
child)
case Filter(filterExp, child) =>
Filter(updateExpression(Seq(filterExp)).head, child)
case Project(pList, child) =>
Project(
updateExpression(pList.asInstanceOf[Seq[Expression]]).asInstanceOf[Seq[NamedExpression]],
child)
case Sort(sortOrders, global, child) =>
Sort(updateSortExpression(sortOrders), global, child)
case Join(left, right, joinType, condition) =>
val updatedCondition = condition match {
case Some(expr) => Some(updateExpression(Seq(expr)).head)
case _ => condition
}
Join(left, right, joinType, updatedCondition)
}
updatedPlan
}
/**
* Below method will be used to update the sort expression
* @param sortExp sort order expression in query
* @return updated sort expression
*/
def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
sortExp map { order =>
SortOrder(order.child transform {
case attr: AttributeReference =>
updatedExpression.find { p => p._1.sameRef(attr) } match {
case Some((_, childAttr)) =>
CarbonToSparkAdapter.createAttributeReference(
childAttr.name,
childAttr.dataType,
childAttr.nullable,
childAttr.metadata,
childAttr.exprId,
attr.qualifier,
attr)
case None =>
attr
}
}, order.direction )
}
}
/**
* Below method will be used to update the expression like group by expression
* @param expressions sequence of expression like group by
* @return updated expressions
*/
def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
expressions map { expression =>
expression transform {
case attr: AttributeReference =>
updatedExpression.find { p => p._1.sameRef(attr) } match {
case Some((_, childAttr)) =>
CarbonToSparkAdapter.createAttributeReference(
childAttr.name,
childAttr.dataType,
childAttr.nullable,
childAttr.metadata,
childAttr.exprId,
attr.qualifier,
attr)
case None =>
attr
}
}
}
}
/**
* Below method will be used to validate and transform the main table plan to child table plan
* rules for transforming is as below.
* 1. Grouping expression rules
* 1.1 Change the parent attribute reference for of group expression
* to child attribute reference
*
* 2. Aggregate expression rules
* 2.1 Change the parent attribute reference for of group expression to
* child attribute reference
* 2.2 Change the count AggregateExpression to Sum as count
* is already calculated so in case of aggregate table
* we need to apply sum to get the count
* 2.2 In case of average aggregate function select 2 columns from aggregate table with
* aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)).
* Note: During aggregate table creation for average table will be created with two columns
* one for sum(column) and count(column) to support rollup
* 3. Filter Expression rules.
* 3.1 Updated filter expression attributes with child table attributes
* 4. Update the Parent Logical relation with child Logical relation
* 5. timeseries function
* 5.1 validate parent table has timeseries datamap
* 5.2 timeseries function is valid function or not
* 6. Streaming
* Rules for updating plan in case of streaming table:
* In case of streaming data will be fetched from both fact and aggregate as aggregate table
* will be updated only after each hand-off, so current streamed data won't be available on
* aggregate table.
* 6.1 Add one union node to add both fact and aggregate table plan to
* get the data from both table
* 6.2 On top of Union Node add one Aggregate node to aggregate both table results
* 6.3 In case of average(avg(column)) special handling is required for streaming
* 7.3.1 Fact Plan will updated to return sum(column) and count(column) to do rollup
* 7.3.2 Aggregate Plan will updated to return sum(column) and count(column) to do rollup
* 6.4 In newly added Aggregate node all the aggregate expression must have same expression id as
* fact and fact plan will updated with new expression id. As query like order by this can be
* referred. In example1 sum(totalSalary) as totalSalary will have same expression id
* as in fact and fact plan sum(salary) will be updated with new expression id
*
* @param logicalPlan parent logical plan
* @return transformed plan
*/
def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = {
var isPlanUpdated = false
val updatedPlan = logicalPlan.transform {
case agg@Aggregate(
grExp,
aggExp,
CarbonSubqueryAlias(alias1, child@CarbonSubqueryAlias(alias2, l: LogicalRelation)))
if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
metaData.hasAggregateDataMapSchema && !isPlanUpdated =>
val carbonTable = getCarbonTable(l)
if (isSpecificSegmentNotPresent(carbonTable)) {
val list = scala.collection.mutable.HashSet.empty[QueryColumn]
val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression]
val isValidPlan = extractQueryColumnsFromAggExpression(
grExp,
aggExp,
carbonTable,
list,
aggregateExpressions)
if (isValidPlan) {
val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list,
aggregateExpressions,
carbonTable,
agg)
if (null != aggDataMapSchema && null != childPlan) {
val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
val (updatedGroupExp, updatedAggExp, newChild, None) =
getUpdatedExpressions(grExp,
aggExp,
child,
None,
aggDataMapSchema,
attributes,
childPlan,
carbonTable,
agg)
isPlanUpdated = true
setExplain(aggDataMapSchema)
val updateAggPlan =
Aggregate(
updatedGroupExp,
updatedAggExp,
CarbonReflectionUtils.getSubqueryAlias(
sparkSession,
Some(alias1),
CarbonReflectionUtils.getSubqueryAlias(
sparkSession,
Some(alias2),
newChild,
None),
None))
getAggregateQueryPlan(
updateAggPlan,
grExp,
aggExp,
carbonTable,
aggDataMapSchema,
agg)
} else {
agg
}
} else {
agg
}
} else {
agg
}
// case for aggregation query
case agg@Aggregate(
grExp,
aggExp,
child@CarbonSubqueryAlias(alias, l: LogicalRelation))
if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
metaData.hasAggregateDataMapSchema && !isPlanUpdated =>
val carbonTable = getCarbonTable(l)
if(isSpecificSegmentNotPresent(carbonTable)) {
val list = scala.collection.mutable.HashSet.empty[QueryColumn]
val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression]
val isValidPlan = extractQueryColumnsFromAggExpression(
grExp,
aggExp,
carbonTable,
list,
aggregateExpressions)
if (isValidPlan) {
val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list,
aggregateExpressions,
carbonTable,
agg)
if (null != aggDataMapSchema && null != childPlan) {
val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
val (updatedGroupExp, updatedAggExp, newChild, None) =
getUpdatedExpressions(grExp,
aggExp,
child,
None,
aggDataMapSchema,
attributes,
childPlan,
carbonTable,
agg)
isPlanUpdated = true
setExplain(aggDataMapSchema)
val updateAggPlan =
Aggregate(
updatedGroupExp,
updatedAggExp,
CarbonReflectionUtils.getSubqueryAlias(
sparkSession,
Some(alias),
newChild,
None))
getAggregateQueryPlan(
updateAggPlan,
grExp,
aggExp,
carbonTable,
aggDataMapSchema,
agg)
} else {
agg
}
} else {
agg
}
} else {
agg
}
// case of handling aggregation query with filter
case agg@Aggregate(
grExp,
aggExp,
Filter(expression, child@CarbonSubqueryAlias(alias, l: LogicalRelation)))
if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
metaData.hasAggregateDataMapSchema && !isPlanUpdated =>
val carbonTable = getCarbonTable(l)
if(isSpecificSegmentNotPresent(carbonTable)) {
val list = scala.collection.mutable.HashSet.empty[QueryColumn]
val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression]
var isValidPlan = extractQueryColumnsFromAggExpression(
grExp,
aggExp,
carbonTable,
list,
aggregateExpressions)
if (isValidPlan) {
isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression)
}
// getting the columns from filter expression
if (isValidPlan) {
extractColumnFromExpression(expression, list, carbonTable, true)
}
if (isValidPlan) {
val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list,
aggregateExpressions,
carbonTable,
agg)
if (null != aggDataMapSchema && null != childPlan) {
val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
getUpdatedExpressions(grExp,
aggExp,
child,
Some(expression),
aggDataMapSchema,
attributes,
childPlan,
carbonTable,
agg)
isPlanUpdated = true
setExplain(aggDataMapSchema)
val updateAggPlan =
Aggregate(
updatedGroupExp,
updatedAggExp,
Filter(
updatedFilterExpression.get,
CarbonReflectionUtils.getSubqueryAlias(
sparkSession,
Some(alias),
newChild,
None)))
getAggregateQueryPlan(
updateAggPlan,
grExp,
aggExp,
carbonTable,
aggDataMapSchema,
agg)
} else {
agg
}
} else {
agg
}
} else {
agg
}
case agg@Aggregate(
grExp,
aggExp,
Filter(
expression,
CarbonSubqueryAlias(alias1, child@CarbonSubqueryAlias(alias2, l: LogicalRelation))))
if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
metaData.hasAggregateDataMapSchema && !isPlanUpdated =>
val carbonTable = getCarbonTable(l)
if(isSpecificSegmentNotPresent(carbonTable)) {
val list = scala.collection.mutable.HashSet.empty[QueryColumn]
val aggregateExpressions = scala.collection.mutable.HashSet.empty[AggregateExpression]
var isValidPlan = extractQueryColumnsFromAggExpression(
grExp,
aggExp,
carbonTable,
list,
aggregateExpressions)
if (isValidPlan) {
isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(expression)
}
// getting the columns from filter expression
if (isValidPlan) {
extractColumnFromExpression(expression, list, carbonTable, true)
}
if (isValidPlan) {
val (aggDataMapSchema, childPlan) = getChildDataMapForTransformation(list,
aggregateExpressions,
carbonTable,
agg)
if (null != aggDataMapSchema && null != childPlan) {
val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
getUpdatedExpressions(grExp,
aggExp,
child,
Some(expression),
aggDataMapSchema,
attributes,
childPlan,
carbonTable,
agg)
isPlanUpdated = true
setExplain(aggDataMapSchema)
val updateAggPlan =
Aggregate(
updatedGroupExp,
updatedAggExp,
Filter(
updatedFilterExpression.get,
CarbonReflectionUtils.getSubqueryAlias(
sparkSession,
Some(alias1),
CarbonReflectionUtils.getSubqueryAlias(
sparkSession,
Some(alias2),
newChild,
None),
None)))
getAggregateQueryPlan(
updateAggPlan,
grExp,
aggExp,
carbonTable,
aggDataMapSchema,
agg)
} else {
agg
}
} else {
agg
}
} else {
agg
}
}
if(isPlanUpdated) {
CarbonSession.threadSet(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP, "true")
}
updatedPlan
}
// set datamap match information for EXPLAIN command
private def setExplain(dataMapSchema: AggregationDataMapSchema): Unit = {
ExplainCollector.recordMatchedOlapDataMap(
dataMapSchema.getProvider.getShortName, dataMapSchema.getDataMapName)
}
/**
* Method to get the aggregate query plan
* @param aggPlan
* aggregate table query plan
* @param grExp
* fact group by expression
* @param aggExp
* fact aggregate expression
* @param carbonTable
* fact table
* @param aggregationDataMapSchema
* selected aggregation data map
* @param factAggPlan
* fact aggregate query plan
* @return updated plan
*/
def getAggregateQueryPlan(aggPlan: LogicalPlan,
grExp: Seq[Expression],
aggExp: Seq[NamedExpression],
carbonTable: CarbonTable,
aggregationDataMapSchema: DataMapSchema,
factAggPlan: LogicalPlan): LogicalPlan = {
// to handle streaming table with pre aggregate
if (carbonTable.isStreamingSink) {
setSegmentsForStreaming(carbonTable, aggregationDataMapSchema)
// get new fact expression
val factExp = updateFactTablePlanForStreaming(factAggPlan)
// get new Aggregate node expression
val aggPlanNew = updateAggTablePlanForStreaming(aggPlan)
val streamingNodeExp = getExpressionsForStreaming(aggExp)
// clear the expression as in case of streaming it is not required
updatedExpression.clear
// Add Aggregate node to aggregate data from fact and aggregate
Aggregate(
createNewAggGroupBy(grExp, factAggPlan),
streamingNodeExp.asInstanceOf[Seq[NamedExpression]],
// add union node to get the result from both
Union(
factExp,
aggPlanNew))
} else {
aggPlan
}
}
/**
* create group by expression for newly Added Aggregate node
* @param grExp fact group by expression
* @param plan fact query plan
* @return group by expression
*/
private def createNewAggGroupBy(grExp: Seq[Expression], plan: LogicalPlan): Seq[Expression] = {
grExp.map {
case attr: AttributeReference =>
val aggModel = AggExpToColumnMappingModel(
removeQualifiers(PreAggregateUtil.normalizeExprId(attr, plan.allAttributes)))
if (factPlanGrpExpForStreaming.get(aggModel).isDefined) {
factPlanGrpExpForStreaming.get(aggModel).get
} else {
attr
}
case exp: Expression =>
val aggModel = AggExpToColumnMappingModel(
removeQualifiers(PreAggregateUtil.normalizeExprId(exp, plan.allAttributes)))
factPlanGrpExpForStreaming.get(aggModel).get
}
}
/**
* Method to set the segments when query is fired on streaming table with pre aggregate
* Adding a property streaming_seg so while removing from session params we can differentiate
* it was set from CarbonPreAggregateRules
* @param parentTable
* parent arbon table
* @param dataMapSchema
* child datamap schema
*/
def setSegmentsForStreaming(parentTable: CarbonTable, dataMapSchema: DataMapSchema): Unit = {
val mainTableKey = parentTable.getDatabaseName + '.' + parentTable.getTableName
val factManager = new SegmentStatusManager(parentTable.getAbsoluteTableIdentifier)
CarbonSession
.threadSet(CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING + mainTableKey, "true")
CarbonSession
.threadSet(
CarbonCommonConstants.CARBON_INPUT_SEGMENTS + mainTableKey,
factManager.getValidAndInvalidSegments.getValidSegments.asScala.mkString(","))
CarbonSession
.threadSet(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + mainTableKey, "true")
// below code is for aggregate table
val identifier = TableIdentifier(
dataMapSchema.getChildSchema.getTableName,
Some(parentTable.getDatabaseName))
val catalog = CarbonEnv.getInstance(sparkSession).carbonMetaStore
val carbonRelation =
catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
val segmentStatusManager = new SegmentStatusManager(carbonRelation.carbonTable
.getAbsoluteTableIdentifier)
val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
.mkString(",")
val childTableKey = carbonRelation.carbonTable.getDatabaseName + '.' +
carbonRelation.carbonTable.getTableName
CarbonSession
.threadSet(CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING + childTableKey, "true")
CarbonSession
.threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + childTableKey, validSegments)
CarbonSession
.threadSet(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + childTableKey, "false")
}
/**
* Map to keep expression name to its alias mapping. This will be used while adding a node when
* plan for streaming table is updated.
* Note: In case of average fact table plan will have two alias as sum(column) and count(column)
* to support rollup
*/
private val factPlanExpForStreaming = mutable.HashMap[String, Seq[NamedExpression]]()
private val factPlanGrpExpForStreaming = mutable
.HashMap[AggExpToColumnMappingModel, AttributeReference]()
/**
* Below method will be used to get the expression for Aggregate node added for streaming
* Expression id will be same as fact plan as it can be referred in query
*
* @param aggExp
* main table aggregate expression
* @return updated aggregate expression
*/
private def getExpressionsForStreaming(aggExp: Seq[Expression]): Seq[Expression] = {
val updatedExp = aggExp map {
case attr: AttributeReference =>
attr
case alias@Alias(aggExp: AggregateExpression, name) =>
// in case of aggregate expression get the fact alias based on expression name
val factAlias = factPlanExpForStreaming(name)
// create attribute reference object for each expression
val attrs = factAlias.map { factAlias =>
CarbonToSparkAdapter.createAttributeReference(
name,
alias.dataType,
alias.nullable,
Metadata.empty,
factAlias.exprId,
alias.qualifier,
alias)
}
// add aggregate function in Aggregate node added for handling streaming
// to aggregate results from fact and aggregate table
val updatedAggExp = getAggregateExpressionForAggregation(aggExp, attrs)
// same reference id will be used as it can be used by above nodes in the plan like
// sort, project, join
CarbonToSparkAdapter.createAliasRef(
updatedAggExp.head,
name,
alias.exprId,
alias.qualifier,
Option(alias.metadata),
Some(alias))
case alias@Alias(expression, name) =>
CarbonToSparkAdapter.createAttributeReference(
name,
alias.dataType,
alias.nullable,
Metadata.empty,
alias.exprId,
alias.qualifier,
alias)
}
updatedExp
}
/**
* Below method will be used to update the fact plan in case of streaming table
* This is required to handle average aggregte function as in case of average we need to return
* two columns data sum(column) and count(column) to get the correct result
*
* @param logicalPlan
* fact table Aggregate plan
* @return updated aggregate plan for fact
*/
private def updateFactTablePlanForStreaming(logicalPlan: LogicalPlan) : LogicalPlan = {
// only aggregate expression needs to be updated
logicalPlan.transform{
case agg@Aggregate(grpExp, aggExp, _) =>
agg
.copy(aggregateExpressions = updateAggExpInFactForStreaming(aggExp, grpExp, agg)
.asInstanceOf[Seq[NamedExpression]])
}
}
/**
* Below method will be used to update the aggregate table plan for streaming
* @param logicalPlan
* aggergate table logical plan
* @return updated logical plan
*/
private def updateAggTablePlanForStreaming(logicalPlan: LogicalPlan) : LogicalPlan = {
// only aggregate expression needs to be updated
logicalPlan.transform{
case agg@Aggregate(grpExp, aggExp, _) =>
agg
.copy(aggregateExpressions = updateAggExpInAggForStreaming(aggExp, grpExp, agg)
.asInstanceOf[Seq[NamedExpression]])
}
}
/**
* Below method will be used to update the aggregate plan for streaming
* @param namedExp
* aggregate expression
* @param grpExp
* group by expression
* @param plan
* aggregate query plan
* @return updated aggregate expression
*/
private def updateAggExpInAggForStreaming(namedExp : Seq[NamedExpression],
grpExp: Seq[Expression], plan: LogicalPlan) : Seq[Expression] = {
// removing alias from expression to compare with grouping expression
// as in case of alias all the projection column will be updated with alias
val updatedExp = namedExp.map {
case Alias(attr: AttributeReference, name) =>
attr
case exp: Expression =>
exp
}
addGrpExpToAggExp(grpExp, updatedExp, plan)
}
/**
* below method will be used to updated the aggregate expression with missing
* group by expression, when only aggregate expression is selected in query
*
* @param grpExp
* group by expressions
* @param aggExp
* aggregate expressions
* @param plan
* logical plan
* @return updated aggregate expression
*/
private def addGrpExpToAggExp(grpExp: Seq[Expression],
aggExp: Seq[Expression],
plan: LogicalPlan): Seq[Expression] = {
// set to add all the current aggregate expression
val expressions = mutable.LinkedHashSet.empty[AggExpToColumnMappingModel]
aggExp.foreach {
case Alias(exp, _) =>
expressions +=
AggExpToColumnMappingModel(
PreAggregateUtil.normalizeExprId(exp, plan.allAttributes), None)
case attr: AttributeReference =>
expressions +=
AggExpToColumnMappingModel(
PreAggregateUtil.normalizeExprId(attr, plan.allAttributes), None)
}
val newAggExp = new ArrayBuffer[Expression]
newAggExp ++= aggExp
// for each group by expression check if already present in set if it is present
// then no need to add otherwise add
var counter = 0
grpExp.foreach{gExp =>
val normalizedExp = AggExpToColumnMappingModel(
PreAggregateUtil.normalizeExprId(gExp, plan.allAttributes), None)
if(!expressions.contains(normalizedExp)) {
gExp match {
case attr: AttributeReference =>
newAggExp += attr
case exp: Expression =>
newAggExp += CarbonToSparkAdapter.createAliasRef(
exp,
"dummy_" + counter,
NamedExpression.newExprId)
counter = counter + 1
}
}
}
newAggExp
}
/**
* Below method will be used to update the aggregate expression for streaming fact table plan
* @param namedExp
* streaming Fact plan aggregate expression
* @return
* Updated streaming fact plan aggregate expression
*/
private def updateAggExpInFactForStreaming(namedExp : Seq[NamedExpression],
grpExp: Seq[Expression], plan: LogicalPlan) : Seq[Expression] = {
val addedExp = addGrpExpToAggExp(grpExp, namedExp, plan)
val updatedExp = addedExp.flatMap {
case attr: AttributeReference =>
Seq(attr)
case alias@Alias(aggExp: AggregateExpression, name) =>
// get the new aggregate expression
val newAggExp = getAggFunctionForFactStreaming(aggExp)
val updatedExp = newAggExp.map { exp =>
CarbonToSparkAdapter.createAliasRef(exp,
name,
NamedExpression.newExprId,
alias.qualifier,
Some(alias.metadata),
Some(alias))
}
// adding to map which will be used while Adding an Aggregate node for handling streaming
// table plan change
factPlanExpForStreaming.put(name, updatedExp)
updatedExp
case alias@Alias(exp: Expression, name) =>
val newAlias = Seq(alias)
val attr = CarbonToSparkAdapter.createAttributeReference(name,
alias.dataType,
alias.nullable,
alias.metadata,
alias.exprId,
alias.qualifier,
alias)
factPlanGrpExpForStreaming.put(
AggExpToColumnMappingModel(
removeQualifiers(PreAggregateUtil.normalizeExprId(exp, plan.allAttributes))),
attr)
factPlanExpForStreaming.put(name, newAlias)
newAlias
}
updatedExp
}
/**
* Below method will be used to update the fact table query aggregate function expression
* Rules for updating the expression.
* In case of average return sum(expression), count(expression) to get the correct result
* @param aggExp
* actual query aggregate expression
* @return seq of expression as in case of average we need to return two sum and count
*
*/
def getAggFunctionForFactStreaming(aggExp: AggregateExpression): Seq[Expression] = {
aggExp.aggregateFunction match {
case Average(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
val newExp = Seq(AggregateExpression(Sum(Cast(exp, DoubleType)),
aggExp.mode,
isDistinct = false),
Cast(AggregateExpression(Count(exp), aggExp.mode, false), DoubleType))
newExp
case Average(exp: Expression) =>
val dataType =
if (exp.dataType.isInstanceOf[DecimalType]) {
// decimal must not go as double precision.
exp.dataType.asInstanceOf[DecimalType]
} else {
DoubleType
}
val newExp = Seq(AggregateExpression(Sum(Cast(exp, dataType)), aggExp.mode, false),
Cast(AggregateExpression(Count(exp), aggExp.mode, false), dataType))
newExp
case _ =>
val newExp = Seq(aggExp)
newExp
}
}
/**
* Below method will be used to validate query plan and get the proper aggregation data map schema
* and child relation plan object if plan is valid for transformation
* @param queryColumns list of query columns from projection and filter
* @param aggregateExpressions list of aggregate expression (aggregate function)
* @param carbonTable parent carbon table
* @param parentLogicalPlan parent logical relation
* @return if plan is valid then aggregation data map schema and its relation plan
*/
def getChildDataMapForTransformation(queryColumns: scala.collection.mutable.HashSet[QueryColumn],
aggregateExpressions: scala.collection.mutable.HashSet[AggregateExpression],
carbonTable: CarbonTable,
parentLogicalPlan: LogicalPlan): (AggregationDataMapSchema, LogicalPlan) = {
// getting all the projection columns
val listProjectionColumn = queryColumns
.filter(queryColumn => !queryColumn.isFilterColumn)
.toList
// getting all the filter columns
val listFilterColumn = queryColumns
.filter(queryColumn => queryColumn.isFilterColumn)
.toList
val isProjectionColumnPresent = (listProjectionColumn.size + listFilterColumn.size) > 0
// create a query plan object which will be used to select the list of pre aggregate tables
// matches with this plan
val queryPlan = new AggregateQueryPlan(listProjectionColumn.asJava, listFilterColumn.asJava)
// create aggregate table selector object
val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable)
// select the list of valid child tables
val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema()
// query has only aggregate expression then selected data map will be empty
// the validate all the child data map otherwise validate selected data map
var selectedAggMaps = if (isProjectionColumnPresent) {
selectedDataMapSchemas
} else {
carbonTable.getTableInfo.getDataMapSchemaList
}
// if it does not match with any pre aggregate table return the same plan
if (!selectedAggMaps.isEmpty) {
// filter the selected child schema based on size to select the pre-aggregate tables
// that are enabled
val catalog = CarbonEnv.getInstance(sparkSession).carbonMetaStore
val relationBuffer = selectedAggMaps.asScala.map { selectedDataMapSchema =>
val identifier = TableIdentifier(
selectedDataMapSchema.getRelationIdentifier.getTableName,
Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName))
val carbonRelation =
catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
val relation = sparkSession.sessionState.catalog.lookupRelation(identifier)
(selectedDataMapSchema, carbonRelation, relation)
}.filter(_._2.sizeInBytes != 0L).sortBy(_._2.sizeInBytes)
if (relationBuffer.isEmpty) {
// If the size of relation Buffer is 0 then it means that none of the pre-aggregate
// tables have data yet.
// In this case we would return the original plan so that the query hits the parent
// table.
(null, null)
} else {
// if query does not have any aggregate function no need to validate the same
val tuple = if (aggregateExpressions.nonEmpty && !selectedAggMaps.isEmpty) {
relationBuffer.collectFirst {
case a@(datamapSchema, _, _)
if validateAggregateExpression(datamapSchema,
carbonTable,
parentLogicalPlan,
aggregateExpressions.toSeq) =>
a
}
} else {
Some(relationBuffer.head)
}
tuple match {
case Some((dataMapSchema, _, logicalPlan)) => (dataMapSchema
.asInstanceOf[AggregationDataMapSchema], new FindDataSourceTable(sparkSession)
.apply(logicalPlan))
case None => (null, null)
}
// If the relationBuffer is enabled then find the table with the minimum size.
}
} else {
(null, null)
}
}
/**
* Below method will be used to validate aggregate expression with the data map
* and will return the selected valid data maps
* @param selectedDataMap list of data maps
* @param carbonTable parent carbon table
* @param parentLogicalPlan parent logical plan
* @param queryAggExpLogicalPlans query agg expression logical plan
* @return valid data map
*/
def validateAggregateExpression(selectedDataMap: DataMapSchema,
carbonTable: CarbonTable,
parentLogicalPlan: LogicalPlan,
queryAggExpLogicalPlans: Seq[AggregateExpression]): Boolean = {
val mappingModel = getExpressionToColumnMapping(selectedDataMap,
carbonTable,
parentLogicalPlan)
queryAggExpLogicalPlans.forall{p =>
mappingModel.exists{m =>
matchExpression(
PreAggregateUtil.normalizeExprId(p, parentLogicalPlan.allAttributes),
m.expression)}
}
}
/**
* Below method will be used to update the expression
* It will remove the qualifiers
* @param expression
* expression
* @return updated expressions
*/
private def removeQualifiers(expression: Expression) : Expression = {
expression.transform {
case attr: AttributeReference =>
CarbonToSparkAdapter.createAttributeReference(
attr.name,
attr.dataType,
attr.nullable,
attr.metadata,
attr.exprId,
None,
attr)
}
}
/**
* Below method will be used to match two expressions
* @param firstExp
* first expression
* @param secondExp
* second expressios
* @return is similare
*/
private def matchExpression(firstExp: Expression, secondExp: Expression) : Boolean = {
val first = removeQualifiers(firstExp)
val second = removeQualifiers(secondExp)
first == second
}
/**
* Below method will be used to to get the logical plan for each aggregate expression in
* child data map and its column schema mapping if mapping is already present
* then it will use the same otherwise it will generate and stored in aggregation data map
* @param selectedDataMap child data map
* @param carbonTable parent table
* @param parentLogicalPlan logical relation of actual plan
* @return map of logical plan for each aggregate expression in child query and its column mapping
*/
def getExpressionToColumnMapping(selectedDataMap: DataMapSchema,
carbonTable: CarbonTable,
parentLogicalPlan: LogicalPlan): mutable.Set[AggExpToColumnMappingModel] = {
val aggDataMapSchema = selectedDataMap.asInstanceOf[AggregationDataMapSchema]
if(null == aggDataMapSchema.getAggExpToColumnMapping) {
// add preAGG UDF to avoid all the PreAggregate rule
val childDataMapQueryString = parser.addPreAggFunction(
PreAggregateUtil.getChildQuery(aggDataMapSchema))
// get the logical plan
val aggPlan = sparkSession.sql(childDataMapQueryString).logicalPlan
// getting all aggregate expression from query
val dataMapAggExp = getAggregateExpFromChildDataMap(aggPlan)
// in case of average child table will have two columns which will be stored in sequence
// so for average expression we need to get two columns for mapping
var counter = 0
// sorting the columns based on schema ordinal so search will give proper result
val sortedColumnList = aggDataMapSchema.getChildSchema.getListOfColumns.asScala
.sortBy(_.getSchemaOrdinal)
val expressionToColumnMapping = mutable.LinkedHashSet.empty[AggExpToColumnMappingModel]
dataMapAggExp.foreach { aggExp =>
val updatedExp = PreAggregateUtil.normalizeExprId(aggExp, aggPlan.allAttributes)
val model = AggExpToColumnMappingModel(updatedExp, None)
if (!expressionToColumnMapping.contains(model)) {
// check if aggregate expression is of type avg
// get the columns
val columnSchema = aggDataMapSchema
.getAggColumnBasedOnIndex(counter, sortedColumnList.asJava)
// increment the counter so when for next expression above code will be
// executed it will search from that schema ordinal
counter = columnSchema.getSchemaOrdinal + 1
model.columnSchema = Some(columnSchema)
expressionToColumnMapping += model
}
}
aggDataMapSchema.setAggExpToColumnMapping(expressionToColumnMapping.asJava)
// return the mapping
expressionToColumnMapping
} else {
aggDataMapSchema.getAggExpToColumnMapping
.asInstanceOf[java.util.Set[AggExpToColumnMappingModel]].asScala
.asInstanceOf[mutable.LinkedHashSet[AggExpToColumnMappingModel]]
}
}
/**
* Below method will be used to get aggregate expression
* @param logicalPlan logical plan
* @return list of aggregate expression
*/
def getAggregateExpFromChildDataMap(logicalPlan: LogicalPlan): Seq[AggregateExpression] = {
val list = scala.collection.mutable.ListBuffer.empty[AggregateExpression]
logicalPlan match {
case _@Aggregate(_, aggExp, _) =>
aggExp map {
case Alias(attr: AggregateExpression, _) =>
list ++= PreAggregateUtil.validateAggregateFunctionAndGetFields(attr)
case _ =>
}
}
list
}
/**
* Below method will be used to check whether specific segment is set for maintable
* if it is present then no need to transform the plan and query will be executed on
* maintable
* @param carbonTable parent table
* @return is specific segment is present in session params
*/
def isSpecificSegmentNotPresent(carbonTable: CarbonTable) : Boolean = {
val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
if (carbonSessionInfo != null) {
carbonSessionInfo.getSessionParams
.getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
.getDatabaseName + "." + carbonTable.getTableName, "").isEmpty
} else {
true
}
}
/**
* Below method will be used to extract the query columns from
* filter expression
* @param expression filter expression
* @param queryColumns query column set
* @param carbonTable parent table
* @return isvalid filter expression for aggregate
*/
def extractColumnFromExpression(expression: Expression,
queryColumns: scala.collection.mutable.HashSet[QueryColumn],
carbonTable: CarbonTable,
isFilterColumn: Boolean = false) {
// map to maintain attribute reference present in the filter to timeseries function
// if applied this is added to avoid duplicate column
val mapOfColumnSeriesFun = scala.collection.mutable.HashMap.empty[AttributeReference, String]
expression.transform {
case attr: AttributeReference =>
if (mapOfColumnSeriesFun.get(attr).isEmpty) {
mapOfColumnSeriesFun.put(attr, null)
}
attr
case udf@CarbonScalaUDF(_) =>
// for handling timeseries function
if (udf.asInstanceOf[ScalaUDF].function.getClass.getName.equalsIgnoreCase(
"org.apache.spark.sql.execution.command.timeseries.TimeseriesFunction") &&
CarbonUtil.hasTimeSeriesDataMap(carbonTable)) {
mapOfColumnSeriesFun.put(udf.children.head.asInstanceOf[AttributeReference],
udf.children.last.asInstanceOf[Literal].value.toString)
} else {
// for any other scala udf
udf.transform {
case attr: AttributeReference =>
if (mapOfColumnSeriesFun.get(attr).isEmpty) {
mapOfColumnSeriesFun.put(attr, null)
}
attr
}
}
udf
}
mapOfColumnSeriesFun.foreach { f =>
if (f._2 == null) {
queryColumns +=
getQueryColumn(f._1.name, carbonTable, isFilterColumn)
} else {
queryColumns += getQueryColumn(f._1.name,
carbonTable,
isFilterColumn,
timeseriesFunction = f._2)
}
}
}
/**
* Below method will be used to get the child attribute reference
* based on parent name
*
* @param dataMapSchema child schema
* @param attributeReference parent attribute reference
* @param attributes child logical relation
* @param canBeNull this is added for strict validation in which case child attribute can be
* null and when it cannot be null
* @return child attribute reference
*/
def getChildAttributeReference(dataMapSchema: DataMapSchema,
attributeReference: AttributeReference,
attributes: Seq[AttributeReference],
canBeNull: Boolean = false,
timeseriesFunction: String = ""): AttributeReference = {
val aggregationDataMapSchema = dataMapSchema.asInstanceOf[AggregationDataMapSchema]
val columnSchema = if (timeseriesFunction.isEmpty) {
aggregationDataMapSchema.getChildColByParentColName(attributeReference.name.toLowerCase)
} else {
aggregationDataMapSchema
.getTimeseriesChildColByParent(attributeReference.name.toLowerCase,
timeseriesFunction)
}
// here column schema cannot be null, if it is null then aggregate table selection
// logic has some problem
if (!canBeNull && null == columnSchema) {
throw new AnalysisException("Column does not exists in Pre Aggregate table")
}
if(null == columnSchema && canBeNull) {
null
} else {
// finding the child attribute from child logical relation
attributes.find(p => p.name.equals(columnSchema.getColumnName)).get
}
}
/**
* Below method will be used to get the updated expression for pre aggregated table.
* It will replace the attribute of actual plan with child table attributes.
* Updation will be done for below expression.
* 1. Grouping expression
* 2. aggregate expression
* 3. child logical plan
* 4. filter expression if present
*
* @param groupingExpressions actual plan grouping expression
* @param aggregateExpressions actual plan aggregate expression
* @param child child logical plan
* @param filterExpression filter expression
* @param aggDataMapSchema pre aggregate table schema
* @param attributes pre aggregate table logical relation
* @param aggPlan aggregate logical plan
* @return tuple of(updated grouping expression,
* updated aggregate expression,
* updated child logical plan,
* updated filter expression if present in actual plan)
*/
def getUpdatedExpressions(groupingExpressions: Seq[Expression],
aggregateExpressions: Seq[NamedExpression],
child: LogicalPlan, filterExpression: Option[Expression] = None,
aggDataMapSchema: AggregationDataMapSchema,
attributes: Seq[AttributeReference],
aggPlan: LogicalPlan,
parentTable: CarbonTable,
parentLogicalPlan: LogicalPlan): (Seq[Expression], Seq[NamedExpression], LogicalPlan,
Option[Expression]) = {
val aggExpColumnMapping = if (null != aggDataMapSchema.getAggExpToColumnMapping) {
Some(aggDataMapSchema.getAggExpToColumnMapping
.asInstanceOf[java.util.Set[AggExpToColumnMappingModel]].asScala
.asInstanceOf[mutable.LinkedHashSet[AggExpToColumnMappingModel]])
} else {
None
}
// transforming the group by expression attributes with child attributes
val updatedGroupExp = groupingExpressions.map { exp =>
exp.transform {
case attr: AttributeReference =>
val childAttr = getChildAttributeReference(aggDataMapSchema, attr, attributes)
childAttr
}
}
// below code is for updating the aggregate expression.
// Note: In case of aggregate expression updation we need to return alias as
// while showing the final result we need to show based on actual query
// for example: If query is "select name from table group by name"
// if we only update the attributes it will show child table column name in final output
// so for handling this if attributes does not have alias we need to return alias of
// parent
// table column name
// Rules for updating aggregate expression.
// 1. If it matches with attribute reference return alias of child attribute reference
// 2. If it matches with alias return same alias with child attribute reference
// 3. If it matches with alias of any supported aggregate function return aggregate function
// with child attribute reference. Please check class level documentation how when aggregate
// function will be updated
val updatedAggExp = aggregateExpressions.flatMap {
// case for attribute reference
case attr: AttributeReference =>
val childAttr = getChildAttributeReference(aggDataMapSchema,
attr,
attributes)
val newExpressionId = NamedExpression.newExprId
val childTableAttr = CarbonToSparkAdapter.createAttributeReference(attr.name,
childAttr.dataType,
childAttr.nullable,
childAttr.metadata,
newExpressionId,
childAttr.qualifier,
attr)
updatedExpression.put(attr, childTableAttr)
// returning the alias to show proper column name in output
Seq(Alias(childAttr,
attr.name)(newExpressionId,
childAttr.qualifier).asInstanceOf[NamedExpression])
// case for alias
case alias@Alias(attr: AttributeReference, name) =>
val childAttr = getChildAttributeReference(aggDataMapSchema,
attr,
attributes)
val newExpressionId = NamedExpression.newExprId
val parentTableAttr = CarbonToSparkAdapter.createAttributeReference(name,
alias.dataType,
alias.nullable,
Metadata.empty,
alias.exprId,
alias.qualifier,
alias)
val childTableAttr = CarbonToSparkAdapter.createAttributeReference(name,
alias.dataType,
alias.nullable,
Metadata.empty,
newExpressionId,
alias.qualifier,
alias)
updatedExpression.put(parentTableAttr, childTableAttr)
// returning alias with child attribute reference
Seq(Alias(childAttr,
name)(newExpressionId,
childAttr.qualifier).asInstanceOf[NamedExpression])
// for aggregate function case
case alias@Alias(attr: AggregateExpression, name) =>
// get the updated aggregate aggregate function
val aggExp = if (aggExpColumnMapping.isDefined) {
getUpdatedAggregateExpressionForChild(attr,
aggDataMapSchema,
attributes,
parentTable,
parentLogicalPlan,
aggExpColumnMapping.get,
parentTable.isStreamingSink)
} else {
Seq(attr)
}
if(!parentTable.isStreamingSink) {
// for normal table
// generate new expression id for child
val newExpressionId = NamedExpression.newExprId
// create a parent attribute reference which will be replced on node which may be referred
// by node like sort join
val parentTableAttr = CarbonToSparkAdapter.createAttributeReference(name,
alias.dataType,
alias.nullable,
Metadata.empty,
alias.exprId,
alias.qualifier,
alias)
// creating a child attribute reference which will be replced
val childTableAttr = CarbonToSparkAdapter.createAttributeReference(name,
alias.dataType,
alias.nullable,
Metadata.empty,
newExpressionId,
alias.qualifier,
alias)
// adding to map, will be used during other node updation like sort, join, project
updatedExpression.put(parentTableAttr, childTableAttr)
// returning alias with child attribute reference
Seq(Alias(aggExp.head,
name)(newExpressionId,
alias.qualifier).asInstanceOf[NamedExpression])
} else {
// for streaming table
// create alias for aggregate table
val aggExpForStreaming = aggExp.map{ exp =>
CarbonToSparkAdapter.createAliasRef(exp,
name,
NamedExpression.newExprId,
alias.qualifier,
Some(alias.metadata),
Some(alias)).asInstanceOf[NamedExpression]
}
aggExpForStreaming
}
case alias@Alias(expression: Expression, name) =>
val updatedExp =
if (expression.isInstanceOf[ScalaUDF] &&
expression.asInstanceOf[ScalaUDF].function.getClass.getName.equalsIgnoreCase(
"org.apache.spark.sql.execution.command.timeseries.TimeseriesFunction")) {
expression.asInstanceOf[ScalaUDF].transform {
case attr: AttributeReference =>
val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
attr,
attributes,
timeseriesFunction =
expression.asInstanceOf[ScalaUDF].children(1).asInstanceOf[Literal].value
.toString)
childAttributeReference
}
} else {
expression.transform{
case attr: AttributeReference =>
val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
attr,
attributes)
childAttributeReference
}
}
val newExpressionId = NamedExpression.newExprId
val parentTableAttr = CarbonToSparkAdapter.createAttributeReference(name,
alias.dataType,
alias.nullable,
Metadata.empty,
alias.exprId,
alias.qualifier,
alias)
val childTableAttr = CarbonToSparkAdapter.createAttributeReference(name,
alias.dataType,
alias.nullable,
Metadata.empty,
newExpressionId,
alias.qualifier,
alias)
updatedExpression.put(parentTableAttr, childTableAttr)
Seq(Alias(updatedExp, name)(newExpressionId,
alias.qualifier).asInstanceOf[NamedExpression])
}
// transforming the logical relation
val newChild = child.transform {
case _: LogicalRelation =>
aggPlan
case _: SubqueryAlias =>
aggPlan match {
case s: SubqueryAlias => s.child
case others => others
}
}
// updating the filter expression if present
val updatedFilterExpression = if (filterExpression.isDefined) {
val filterExp = filterExpression.get
Some(filterExp.transform {
case attr: AttributeReference =>
getChildAttributeReference(aggDataMapSchema, attr, attributes)
})
} else {
None
}
(updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression)
}
/**
* Below method will be used to get the aggregate expression based on match
* Aggregate expression updation rules
* 1 Change the count AggregateExpression to Sum as count
* is already calculated so in case of aggregate table
* we need to apply sum to get the count
* 2 In case of average aggregate function select 2 columns from aggregate table
* with aggregation sum and count.
* Then add divide(sum(column with sum), sum(column with count)).
* Note: During aggregate table creation for average aggregation function
* table will be created with two columns one for sum(column) and count(column)
* to support rollup
*
* @param aggExp aggregate expression
* @param dataMapSchema child data map schema
* @param attributes child logical relation
* @param parentTable parent carbon table
* @param parentLogicalPlan logical relation
* @return updated expression
*/
def getUpdatedAggregateExpressionForChild(aggExp: AggregateExpression,
dataMapSchema: AggregationDataMapSchema,
attributes: Seq[AttributeReference],
parentTable: CarbonTable,
parentLogicalPlan: LogicalPlan,
aggExpColumnMapping: mutable.LinkedHashSet[AggExpToColumnMappingModel],
isStreamingTable: Boolean):
Seq[Expression] = {
// get the updated aggregate expression, in case of average column
// it will be divided in two aggergate expression
val updatedAggExp = PreAggregateUtil.validateAggregateFunctionAndGetFields(aggExp)
// get the attributes to be updated for child table
val attrs = aggExpColumnMapping.collect {
case (schemaAggExpModel)
if updatedAggExp
.exists(p =>
matchExpression(schemaAggExpModel.expression,
PreAggregateUtil.normalizeExprId(p, parentLogicalPlan.allAttributes))) =>
attributes filter (_.name.equalsIgnoreCase(
schemaAggExpModel.columnSchema.get.asInstanceOf[ColumnSchema].getColumnName))
}.flatten
// getting aggregate table aggregate expressions
getAggregateExpressionForAggregation(aggExp, attrs.toSeq, isStreamingTable)
}
/**
* Below method will be used to update the aggregate expression.
* 1.In case of average below expression will be returned.
* 1.1 Streaming table
* 1.1.1 Aggregate table
* It will return sum(expression) and count(expression)
* 1.2.1 Aggregate node added for streaming
* It will return Divide(sum(expression)/count(expression))
* 2.1 Normal table
* 2.1.1 Aggregate table
* It will return Divide(sum(expression)/count(expression))
* 2. In case of count for aggregate table and aggregate node added for streaming
* table count will be aggregated to sum
*
* @param aggExp
* aggregate expression
* @param attrs
* aggregate function Attribute, in case of average it will be two to support rollup
* @return
* aggregate expression
*/
def getAggregateExpressionForAggregation(aggExp: AggregateExpression,
attrs: Seq[AttributeReference],
isStreamingTable: Boolean = false): Seq[Expression] = {
aggExp.aggregateFunction match {
case Sum(MatchCastExpression(_, changeDataType: DataType)) =>
Seq(AggregateExpression(Sum(Cast(attrs.head, changeDataType)), aggExp.mode, false))
case Sum(_) =>
Seq(AggregateExpression(Sum(attrs.head), aggExp.mode, false))
case Max(MatchCastExpression(_, changeDataType: DataType)) =>
Seq(AggregateExpression(Max(Cast(attrs.head, changeDataType)), aggExp.mode, false))
case Max(_) =>
Seq(AggregateExpression(Max(attrs.head), aggExp.mode, false))
case Min(MatchCastExpression(_, changeDataType: DataType)) =>
Seq(AggregateExpression(Min(Cast(attrs.head, changeDataType)), aggExp.mode, false))
case Min(_) =>
Seq(AggregateExpression(Min(attrs.head), aggExp.mode, false))
// Change the count AggregateExpression to Sum as count
// is already calculated so in case of aggregate table
// we need to apply sum to get the count
case Count(Seq(expression: Expression)) =>
Seq(AggregateExpression(Sum(Cast(attrs.head, LongType)), aggExp.mode, false))
case Average(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
// for handling Normal table case/Aggregate node added in case of streaming table
if (!isStreamingTable) {
// In case of average aggregate function select 2 columns from aggregate table
// with aggregation sum and count.
// Then add divide(sum(column with sum), sum(column with count)).
Seq(Divide(AggregateExpression(Sum(Cast(
attrs.head,
DoubleType)),
aggExp.mode,
false),
AggregateExpression(Sum(Cast(
attrs.last,
DoubleType)),
aggExp.mode,
false)))
} else {
// in case of streaming aggregate table return two aggregate function sum and count
Seq(AggregateExpression(Sum(Cast(
attrs.head,
DoubleType)),
aggExp.mode,
false),
AggregateExpression(Sum(Cast(
attrs.last,
DoubleType)),
aggExp.mode,
false))
}
case Average(exp: Expression) =>
val dataType =
if (exp.dataType.isInstanceOf[DecimalType]) {
// decimal must not go as double precision.
exp.dataType.asInstanceOf[DecimalType]
} else {
DoubleType
}
// for handling Normal table case/Aggregate node added in case of streaming table
if (!isStreamingTable) {
// In case of average aggregate function select 2 columns from aggregate table
// with aggregation sum and count.
// Then add divide(sum(column with sum), sum(column with count)).
Seq(Divide(AggregateExpression(Sum(Cast(
attrs.head,
dataType)),
aggExp.mode,
false),
AggregateExpression(Sum(Cast(
attrs.last,
dataType)),
aggExp.mode,
false)))
} else {
// in case of streaming aggregate table return two aggregate function sum and count
Seq(AggregateExpression(Sum(Cast(
attrs.head,
dataType)),
aggExp.mode,
false),
AggregateExpression(Sum(Cast(
attrs.last,
dataType)),
aggExp.mode,
false))
}
}
}
/**
* Method to get the carbon table and table name
* @param parentLogicalRelation parent table relation
* @return tuple of carbon table
*/
def getCarbonTable(parentLogicalRelation: LogicalRelation): CarbonTable = {
val carbonTable = parentLogicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
.carbonRelation
.metaData.carbonTable
carbonTable
}
/**
* Below method will be used to get the query columns from plan
* @param groupByExpression group by expression
* @param aggregateExpressions aggregate expression
* @param carbonTable parent carbon table
* @param queryColumns list of attributes
* @return plan is valid
*/
def extractQueryColumnsFromAggExpression(groupByExpression: Seq[Expression],
aggregateExpressions: Seq[NamedExpression],
carbonTable: CarbonTable,
queryColumns: scala.collection.mutable.HashSet[QueryColumn],
aggreagteExps: scala.collection.mutable.HashSet[AggregateExpression]): Boolean = {
var isValid = true
groupByExpression foreach { expression =>
extractColumnFromExpression(expression, queryColumns, carbonTable)
}
aggregateExpressions.map {
case attr: AttributeReference =>
queryColumns += getQueryColumn(attr.name,
carbonTable)
case Alias(attr: AttributeReference, _) =>
queryColumns += getQueryColumn(attr.name,
carbonTable);
case Alias(attr: AggregateExpression, _) =>
if (attr.isDistinct) {
isValid = false
}
val aggExp = PreAggregateUtil.validateAggregateFunctionAndGetFields(attr)
if (aggExp.nonEmpty) {
aggreagteExps ++= aggExp
} else {
isValid = false
}
case Alias(expression: Expression, _) =>
if (expression.isInstanceOf[ScalaUDF] &&
expression.asInstanceOf[ScalaUDF].function.getClass.getName.equalsIgnoreCase(
"org.apache.spark.sql.execution.command.timeseries.TimeseriesFunction") &&
CarbonUtil.hasTimeSeriesDataMap(carbonTable)) {
queryColumns += getQueryColumn(expression.asInstanceOf[ScalaUDF].children(0)
.asInstanceOf[AttributeReference].name,
carbonTable,
timeseriesFunction = expression.asInstanceOf[ScalaUDF].children(1).asInstanceOf[Literal]
.value.toString)
} else {
expression.transform {
case attr: AttributeReference =>
queryColumns += getQueryColumn(attr.name,
carbonTable)
attr
case attr: AggregateExpression =>
if (attr.isDistinct) {
isValid = false
}
val aggExp = PreAggregateUtil.validateAggregateFunctionAndGetFields(attr)
if (aggExp.nonEmpty) {
aggreagteExps ++= aggExp
} else {
isValid = false
}
attr
}
}
}
isValid
}
/**
* Below method will be used to get the query column object which
* will have details of the column and its property
*
* @param columnName parent column name
* @param carbonTable parent carbon table
* @param isFilterColumn is filter is applied on column
* @return query column
*/
def getQueryColumn(columnName: String,
carbonTable: CarbonTable,
isFilterColumn: Boolean = false,
timeseriesFunction: String = ""): QueryColumn = {
val columnSchema = carbonTable.getColumnByName(columnName.toLowerCase)
if(null == columnSchema) {
null
} else {
new QueryColumn(
columnSchema.getColumnSchema,
isFilterColumn,
timeseriesFunction.toLowerCase)
}
}
}
/**
* Data loading rule class to validate and update the data loading query plan
* Validation rule:
* 1. update the avg aggregate expression with two columns sum and count
* 2. Remove duplicate sum and count expression if already there in plan
* @param sparkSession spark session
*/
case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession)
extends Rule[LogicalPlan] {
lazy val parser = new CarbonSpark2SqlParser
override def apply(plan: LogicalPlan): LogicalPlan = {
val validExpressionsMap = scala.collection.mutable.HashSet.empty[AggExpToColumnMappingModel]
val namedExpressionList = scala.collection.mutable.LinkedHashSet.empty[NamedExpression]
plan transform {
case aggregate@Aggregate(groupingExpressions,
aExp,
CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
if validateAggregateExpressions(aExp) &&
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
aExp.foreach {
case attr: AttributeReference =>
namedExpressionList += attr
case alias@Alias(_: AttributeReference, _) =>
namedExpressionList += alias
case alias@Alias(aggExp: AggregateExpression, name) =>
// get the updated expression for avg convert it to two expression
// sum and count
val expressions = PreAggregateUtil.validateAggregateFunctionAndGetFields(aggExp)
// if size is more than one then it was for average
if(expressions.size > 1) {
val sumExp = PreAggregateUtil.normalizeExprId(
expressions.head,
aggregate.allAttributes)
// get the logical plan fro count expression
val countExp = PreAggregateUtil.normalizeExprId(
expressions.last,
aggregate.allAttributes)
// check with same expression already sum is present then do not add to
// named expression list otherwise update the list and add it to set
if (!validExpressionsMap.contains(AggExpToColumnMappingModel(sumExp))) {
namedExpressionList +=
CarbonToSparkAdapter.createAliasRef(expressions.head,
name + "_ sum",
NamedExpression.newExprId,
alias.qualifier,
Some(alias.metadata),
Some(alias))
validExpressionsMap += AggExpToColumnMappingModel(sumExp)
}
// check with same expression already count is present then do not add to
// named expression list otherwise update the list and add it to set
if (!validExpressionsMap.contains(AggExpToColumnMappingModel(countExp))) {
namedExpressionList +=
CarbonToSparkAdapter.createAliasRef(expressions.last, name + "_ count",
NamedExpression.newExprId,
alias.qualifier,
Some(alias.metadata),
Some(alias))
validExpressionsMap += AggExpToColumnMappingModel(countExp)
}
} else {
// get the logical plan for expression
val exp = PreAggregateUtil.normalizeExprId(
expressions.head,
aggregate.allAttributes)
// check with same expression already present then do not add to
// named expression list otherwise update the list and add it to set
if (!validExpressionsMap.contains(AggExpToColumnMappingModel(exp))) {
namedExpressionList+=alias
validExpressionsMap += AggExpToColumnMappingModel(exp)
}
}
case alias@Alias(_: Expression, _) =>
namedExpressionList += alias
}
groupingExpressions foreach {
case namedExpr: NamedExpression => namedExpressionList += namedExpr
case _ => namedExpressionList
}
aggregate.copy(aggregateExpressions = namedExpressionList.toSeq)
case plan: LogicalPlan => plan
}
}
/**
* Called by PreAggregateLoadingRules to validate if plan is valid for applying rules or not.
* If the plan has PreAggLoad i.e Loading UDF and does not have PreAgg i.e Query UDF then it is
* valid.
* @param namedExpression named expressions
* @return valid or not
*/
private def validateAggregateExpressions(namedExpression: Seq[NamedExpression]): Boolean = {
val filteredExpressions = namedExpression.filterNot(_.isInstanceOf[UnresolvedAlias])
filteredExpressions.exists { expr =>
!expr.name.equalsIgnoreCase("PreAgg") && expr.name.equalsIgnoreCase("preAggLoad")
}
}
}