blob: 7d0f8504736010c876ae8c57255cf15c0e10c068 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.optimizer
import java.util
import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.Breaks.{break, breakable}
import org.apache.spark.sql.{CarbonToSparkAdapter, SparkSession}
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, Expression, Literal, NamedExpression, ScalaUDF, SortOrder}
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.unsafe.types.UTF8String
import org.apache.carbondata.core.preagg.TimeSeriesFunctionEnum
import org.apache.carbondata.mv.expressions.modular.{ModularSubquery, ScalarModularSubquery}
import org.apache.carbondata.mv.plans.modular.{ExpressionHelper, GroupBy, HarmonizedRelation, LeafNode, Matchable, ModularPlan, ModularRelation, Select, SimpleModularizer}
import org.apache.carbondata.mv.plans.util.BirdcageOptimizer
import org.apache.carbondata.view.{MVCatalogInSpark, MVPlanWrapper, MVTimeGranularity, TimeSeriesFunction}
/**
* The primary workflow for rewriting relational queries using Spark libraries.
* Designed to allow easy access to the intermediate phases of query rewrite for developers.
*
* While this is not a public class, we should avoid changing the function names for the sake of
* changing them, because a lot of developers use the feature for debugging.
*/
class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
session: SparkSession) {
private def getAliasName(expression: NamedExpression): String = {
expression match {
case Alias(_, name) => name
case reference: AttributeReference => reference.name
}
}
private def getAliasMap(
outputList1: Seq[NamedExpression],
outputList2: Seq[NamedExpression]): Map[AttributeKey, NamedExpression] = {
// when mv is created with duplicate columns like select sum(age),sum(age) from table,
// the outputList2 will have duplicate, so handle that case here
if (outputList1.length == outputList2.groupBy(_.name).size) {
outputList2.zip(outputList1).flatMap {
case (output1, output2) =>
var aliasMapping = output1 collect {
case reference: AttributeReference =>
(AttributeKey(reference), Alias(output2, reference.name)(exprId = output2.exprId))
}
output1 match {
case alias: Alias =>
aliasMapping = Seq((AttributeKey(alias.child),
Alias(output2, alias.name)(exprId = output2.exprId))) ++ aliasMapping
case _ =>
}
Seq((AttributeKey(output1),
Alias(output2, output1.name)(exprId = output2.exprId))) ++ aliasMapping
}.toMap
} else {
throw new UnsupportedOperationException("Cannot create mapping with unequal sizes")
}
}
private def getUpdatedOutputList(outputList: Seq[NamedExpression],
modularPlan: Option[ModularPlan]): Seq[NamedExpression] = {
modularPlan.collect {
case planWrapper: MVPlanWrapper =>
val viewSchema = planWrapper.viewSchema
val viewColumnsOrderMap = viewSchema.getColumnsOrderMap
if (null != viewColumnsOrderMap && !viewColumnsOrderMap.isEmpty) {
val updatedOutputList = new util.ArrayList[NamedExpression]()
var i = 0
while (i < viewColumnsOrderMap.size()) {
val updatedOutput = outputList.filter(
output => output.name.equalsIgnoreCase(viewColumnsOrderMap.get(i))
).head
updatedOutputList.add(updatedOutput)
i = i + 1
}
updatedOutputList.asScala
} else {
outputList
}
case _ => outputList
}.get
}
/**
* Check if modular plan can be rolled up by rewriting and matching the modular plan
* with existing mv datasets.
* @param modularPlan to check if can be rolled up
* @return new modular plan
*/
private def getRolledUpModularPlan(modularPlan: ModularPlan): Option[ModularPlan] = {
// check if modular plan contains timeseries udf
val timeSeriesFunction = modularPlan match {
case select: Select =>
getTimeSeriesFunction(select.outputList)
case groupBy: GroupBy =>
getTimeSeriesFunction(groupBy.outputList)
case _ => (null, null)
}
// set canDoRollUp to false, in case of Join queries and if filter has timeseries udf function
// TODO: support rollUp for join queries
var canRollUp = true
optimizedPlan.transformDown {
case join@Join(_, _, _, _) =>
canRollUp = false
join
case filter@Filter(condition: Expression, _) =>
condition.collect {
case function: ScalaUDF if function.function.isInstanceOf[TimeSeriesFunction] =>
canRollUp = false
}
filter
}
if (null != timeSeriesFunction._2 && canRollUp) {
// check for rollup and rewrite the plan
// collect all the datasets which contains timeseries MV's
val viewSchemaWrappers = catalog.lookupFeasibleSchemas(modularPlan)
var granularityList: java.util.List[TimeSeriesFunctionEnum] =
new java.util.ArrayList[TimeSeriesFunctionEnum]()
// Get all the lower granularities for the query from datasets
viewSchemaWrappers.foreach { viewSchemaWrapper =>
if (viewSchemaWrapper.viewSchema.isTimeSeries) {
viewSchemaWrapper.logicalPlan.transformExpressions {
case alias@Alias(function: ScalaUDF, _) =>
if (function.function.isInstanceOf[TimeSeriesFunction]) {
val granularity =
function.children.last.asInstanceOf[Literal].toString().toUpperCase()
if (MVTimeGranularity.get(timeSeriesFunction._2.toUpperCase).seconds >
MVTimeGranularity.get(granularity).seconds) {
granularityList.add(TimeSeriesFunctionEnum.valueOf(granularity))
}
}
alias
}
}
}
if (!granularityList.isEmpty) {
granularityList = granularityList.asScala.sortBy(_.getOrdinal)(Ordering[Int].reverse).asJava
var originalTable: String = null
// get query Table
modularPlan.collect {
case relation: ModularRelation =>
originalTable = relation.tableName
}
var queryGranularity: String = null
var rewrittenPlan = modularPlan
// replace granularities in the plan and check if plan can be changed
breakable {
granularityList.asScala.foreach { func =>
rewrittenPlan = rewriteGranularityInPlan(modularPlan, func.getName)
rewrittenPlans.add(rewrittenPlan)
val logicalPlan = session.sql(rewrittenPlan.asCompactSQL).queryExecution.optimizedPlan
rewrittenPlans.clear()
var rolledUpTable: String = null
logicalPlan.collect {
case relation: LogicalRelation =>
rolledUpTable = relation.catalogTable.get.identifier.table
case relation: HiveTableRelation =>
rolledUpTable = relation.tableMeta.identifier.table
}
if (!rolledUpTable.equalsIgnoreCase(originalTable)) {
queryGranularity = func.getName
break()
}
}
}
if (null != queryGranularity) {
// rewrite the plan and set it as rolled up plan
val rewrittenPlans = rewriteWithSchemaWrapper0(rewrittenPlan)
if (rewrittenPlans.isDefined) {
rewrittenPlans.get.map(_.setRolledUp())
rewrittenPlans
} else {
None
}
} else {
None
}
} else {
None
}
} else {
None
}
}
private def getTimeSeriesFunction(outputList: scala.Seq[NamedExpression]): (String, String) = {
outputList.collect {
case Alias(function: ScalaUDF, name) =>
if (function.function.isInstanceOf[TimeSeriesFunction]) {
function.children.collect {
case literal: Literal =>
return (name, literal.value.toString)
}
}
}
(null, null)
}
/**
* Rewrite the updated mv query with corresponding MV table.
*/
private def rewrite(modularPlan: ModularPlan): ModularPlan = {
if (modularPlan.find(_.rewritten).isDefined) {
var updatedPlan = modularPlan transform {
case select: Select =>
updatePlan(select)
case groupBy: GroupBy =>
updatePlan(groupBy)
}
if (modularPlan.isRolledUp) {
// If the rewritten query is rolled up, then rewrite the query based on the original modular
// plan. Make a new outputList based on original modular plan and wrap rewritten plan with
// select & group-by nodes with new outputList.
// For example:
// Given User query:
// SELECT timeseries(col,'day') from maintable group by timeseries(col,'day')
// If plan is rewritten as per 'hour' granularity of mv1,
// then rewritten query will be like,
// SELECT mv1.`UDF:timeseries_projectjoindate_hour` AS `UDF:timeseries
// (projectjoindate, hour)`
// FROM
// default.mv1
// GROUP BY mv1.`UDF:timeseries_projectjoindate_hour`
//
// Now, rewrite the rewritten plan as per the 'day' granularity
// SELECT timeseries(gen_subsumer_0.`UDF:timeseries(projectjoindate, hour)`,'day' ) AS
// `UDF:timeseries(projectjoindate, day)`
// FROM
// (SELECT mv2.`UDF:timeseries_projectjoindate_hour` AS `UDF:timeseries
// (projectjoindate, hour)`
// FROM
// default.mv2
// GROUP BY mv2.`UDF:timeseries_projectjoindate_hour`) gen_subsumer_0
// GROUP BY timeseries(gen_subsumer_0.`UDF:timeseries(projectjoindate, hour)`,'day' )
harmonizedPlan match {
case select: Select =>
val outputList = select.outputList
val rolledUpOutputList = updatedPlan.asInstanceOf[Select].outputList
var finalOutputList: Seq[NamedExpression] = Seq.empty
val mapping = outputList zip rolledUpOutputList
val newSubsume = subqueryNameGenerator.newSubsumerName()
mapping.foreach { outputLists =>
val name: String = getAliasName(outputLists._2)
outputLists._1 match {
case a@Alias(scalaUdf: ScalaUDF, aliasName) =>
if (scalaUdf.function.isInstanceOf[TimeSeriesFunction]) {
val newName = newSubsume + ".`" + name + "`"
val transformedUdf = updateTimeSeriesFunction(scalaUdf, newName)
finalOutputList = finalOutputList.:+(Alias(transformedUdf, aliasName)(a.exprId,
a.qualifier).asInstanceOf[NamedExpression])
}
case Alias(attr: AttributeReference, _) =>
finalOutputList = finalOutputList.:+(
CarbonToSparkAdapter.createAttributeReference(attr, name, newSubsume))
case attr: AttributeReference =>
finalOutputList = finalOutputList.:+(
CarbonToSparkAdapter.createAttributeReference(attr, name, newSubsume))
}
}
val newChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
val newAliasMap = new collection.mutable.HashMap[Int, String]()
val sel_plan = select.copy(outputList = finalOutputList,
inputList = finalOutputList,
predicateList = Seq.empty)
newChildren += sel_plan
newAliasMap += (newChildren.indexOf(sel_plan) -> newSubsume)
updatedPlan = select.copy(outputList = finalOutputList,
inputList = finalOutputList,
aliasMap = newAliasMap.toMap,
predicateList = Seq.empty,
children = Seq(updatedPlan)).setRewritten()
case groupBy: GroupBy =>
updatedPlan match {
case select: Select =>
val selectOutputList = groupBy.outputList
val rolledUpOutputList = updatedPlan.asInstanceOf[Select].outputList
var finalOutputList: Seq[NamedExpression] = Seq.empty
var predicateList: Seq[Expression] = Seq.empty
val mapping = selectOutputList zip rolledUpOutputList
val newSubsume = subqueryNameGenerator.newSubsumerName()
mapping.foreach { outputLists =>
val aliasName: String = getAliasName(outputLists._2)
outputLists._1 match {
case a@Alias(scalaUdf: ScalaUDF, _) =>
if (scalaUdf.function.isInstanceOf[TimeSeriesFunction]) {
val newName = newSubsume + ".`" + aliasName + "`"
val transformedUdf = updateTimeSeriesFunction(scalaUdf, newName)
groupBy.predicateList.foreach {
case udf: ScalaUDF if udf.isInstanceOf[ScalaUDF] =>
predicateList = predicateList.:+(transformedUdf)
case attr: AttributeReference =>
predicateList = predicateList.:+(
CarbonToSparkAdapter.createAttributeReference(attr,
attr.name,
newSubsume))
}
finalOutputList = finalOutputList.:+(Alias(transformedUdf, a.name)(a
.exprId, a.qualifier).asInstanceOf[NamedExpression])
}
case attr: AttributeReference =>
finalOutputList = finalOutputList.:+(
CarbonToSparkAdapter.createAttributeReference(attr, aliasName, newSubsume))
case Alias(attr: AttributeReference, _) =>
finalOutputList = finalOutputList.:+(
CarbonToSparkAdapter.createAttributeReference(attr, aliasName, newSubsume))
case a@Alias(agg: AggregateExpression, name) =>
val newAgg = agg.transform {
case attr: AttributeReference =>
CarbonToSparkAdapter.createAttributeReference(attr, name, newSubsume)
}
finalOutputList = finalOutputList.:+(Alias(newAgg, name)(a.exprId,
a.qualifier).asInstanceOf[NamedExpression])
case other => other
}
}
val newChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
val newAliasMap = new collection.mutable.HashMap[Int, String]()
val sel_plan = select.copy(outputList = finalOutputList,
inputList = finalOutputList,
predicateList = Seq.empty)
newChildren += sel_plan
newAliasMap += (newChildren.indexOf(sel_plan) -> newSubsume)
updatedPlan = select.copy(outputList = finalOutputList,
inputList = finalOutputList,
aliasMap = newAliasMap.toMap,
children = Seq(updatedPlan)).setRewritten()
updatedPlan = groupBy.copy(outputList = finalOutputList,
inputList = finalOutputList,
predicateList = predicateList,
alias = Some(newAliasMap.mkString),
child = updatedPlan).setRewritten()
updatedPlan = select.copy(outputList = finalOutputList,
inputList = finalOutputList,
children = Seq(updatedPlan)).setRewritten()
}
}
}
updatedPlan
} else {
modularPlan
}
}
private def rewriteWithSchemaWrapper(modularPlan: ModularPlan): ModularPlan = {
val rewrittenPlan = modularPlan.transformAllExpressions {
case subquery: ModularSubquery =>
if (subquery.children.isEmpty) {
rewriteWithSchemaWrapper0(subquery.plan) match {
case Some(rewrittenPlan) =>
ScalarModularSubquery(rewrittenPlan, subquery.children, subquery.exprId)
case None =>
subquery
}
}
else {
throw new UnsupportedOperationException(s"Rewrite expression $subquery isn't supported")
}
case other => other
}
rewriteWithSchemaWrapper0(rewrittenPlan).getOrElse(rewrittenPlan)
}
private def rewriteWithSchemaWrapper0(modularPlan: ModularPlan): Option[ModularPlan] = {
val rewrittenPlan = modularPlan transformDown {
case plan =>
if (plan.rewritten || !plan.isSPJGH) {
plan
} else {
val rewrittenPlans =
for {schemaWrapper <- catalog.lookupFeasibleSchemas(plan).toStream
subsumer <- SimpleModularizer.modularize(
BirdcageOptimizer.execute(schemaWrapper.logicalPlan)).map(_.semiHarmonized)
subsumee <- unifySubsumee(plan)
rewrittenPlan <- rewriteWithSchemaWrapper0(
unifySubsumer2(
unifySubsumer1(
subsumer,
subsumee,
schemaWrapper.modularPlan),
subsumee),
subsumee)} yield {
rewrittenPlan
}
rewrittenPlans.headOption.map(_.setRewritten()).getOrElse(plan)
}
}
if (rewrittenPlan.fastEquals(modularPlan)) {
if (rewrittenPlans.asScala.exists(plan => plan.sameResult(rewrittenPlan))) {
return None
}
getRolledUpModularPlan(rewrittenPlan)
} else {
Some(rewrittenPlan)
}
}
private def rewriteWithSchemaWrapper0(
subsumer: ModularPlan,
subsumee: ModularPlan): Option[ModularPlan] = {
if (subsumer.getClass == subsumee.getClass) {
(subsumer.children, subsumee.children) match {
case (Nil, Nil) => None
case (r, e) if r.forall(_.isInstanceOf[LeafNode]) && e.forall(_.isInstanceOf[LeafNode]) =>
val matchesPlans =
MVMatchMaker.execute(subsumer, subsumee, None, subqueryNameGenerator)
if (matchesPlans.hasNext) {
Some(matchesPlans.next)
} else {
None
}
case (subsumerChild :: Nil, subsumeeChild :: Nil) =>
val compensation = rewriteWithSchemaWrapper0(subsumerChild, subsumeeChild)
val matchesPlans = compensation.map {
case comp if comp.eq(subsumerChild) =>
MVMatchMaker.execute(subsumer, subsumee, None, subqueryNameGenerator)
case _ =>
MVMatchMaker.execute(subsumer, subsumee, compensation,
subqueryNameGenerator)
}
if (matchesPlans.isDefined && matchesPlans.get.hasNext) {
Some(matchesPlans.get.next)
} else {
None
}
case _ => None
}
} else {
None
}
}
/**
* Replace the identified immediate lower level granularity in the modular plan
* to perform rollup
*
* @param plan to be re-written
* @param queryGranularity to be replaced
* @return plan with granularity changed
*/
private def rewriteGranularityInPlan(plan: ModularPlan, queryGranularity: String) = {
val newPlan = plan.transformDown {
case p => p.transformAllExpressions {
case udf: ScalaUDF =>
if (udf.function.isInstanceOf[TimeSeriesFunction]) {
val transformedUdf = udf.transformDown {
case _: Literal =>
new Literal(UTF8String.fromString(queryGranularity.toLowerCase),
DataTypes.StringType)
}
transformedUdf
} else {
udf
}
case alias@Alias(udf: ScalaUDF, name) =>
if (udf.function.isInstanceOf[TimeSeriesFunction]) {
var literal: String = null
val transformedUdf = udf.transformDown {
case l: Literal =>
literal = l.value.toString
new Literal(UTF8String.fromString(queryGranularity.toLowerCase),
DataTypes.StringType)
}
Alias(transformedUdf,
name.replace(", " + literal, ", " + queryGranularity))(alias.exprId,
alias.qualifier).asInstanceOf[NamedExpression]
} else {
alias
}
}
}
newPlan
}
// match the join table of subsumer and subsumee
// when the table names are the same
private def isJoinMatched(
subsumer: ModularPlan,
subsumee: ModularPlan,
subsumerTable: LeafNode,
subsumeeTable: LeafNode,
subsumerIndex: Int,
subsumeeIndex: Int): Boolean = {
(subsumerTable, subsumeeTable) match {
case _: (ModularRelation, ModularRelation) =>
val subsumerTableParent = subsumer.find(plan => plan.children.contains(subsumerTable)).get
val subsumeeTableParent = subsumee.find(plan => plan.children.contains(subsumeeTable)).get
(subsumerTableParent, subsumeeTableParent) match {
case (subsumerSelect: Select, subsumeeSelect: Select) =>
val intersectJoinEdges = subsumeeSelect.joinEdges intersect subsumerSelect.joinEdges
if (intersectJoinEdges.nonEmpty) {
return intersectJoinEdges.exists(
join =>
join.left == subsumerIndex &&
join.left == subsumeeIndex || join.right == subsumerIndex &&
join.right == subsumeeIndex
)
}
case _ => return false
}
}
true
}
// add Select operator as placeholder on top of subsumee to facilitate matching
private def unifySubsumee(subsumee: ModularPlan): Option[ModularPlan] = {
subsumee match {
case groupBy@GroupBy(_, _, _, _, Select(_, _, _, _, _, _, _, _, _, _), _, _, _) =>
Some(
Select(
groupBy.outputList,
groupBy.outputList,
Nil,
Map.empty,
Nil,
groupBy :: Nil,
groupBy.flags,
groupBy.flagSpec,
Seq.empty)
)
case other => Some(other)
}
}
// add Select operator as placeholder on top of subsumer to facilitate matching
private def unifySubsumer1(
subsumer: ModularPlan,
subsumee: ModularPlan,
modularPlan: ModularPlan): ModularPlan = {
// Update MV table relation to the subsumer modular plan
val updatedSubsumer = subsumer match {
// In case of order by it adds extra select but that can be ignored while doing selection.
case select@Select(_, _, _, _, _, Seq(g: GroupBy), _, _, _, _) =>
select.copy(
children = Seq(g.copy(modularPlan = Some(modularPlan))),
outputList = updateDuplicateColumns(select.outputList))
case select: Select =>
select.copy(
modularPlan = Some(modularPlan),
outputList = updateDuplicateColumns(select.outputList))
case groupBy: GroupBy =>
groupBy.copy(
modularPlan = Some(modularPlan),
outputList = updateDuplicateColumns(groupBy.outputList))
case other => other
}
(updatedSubsumer, subsumee) match {
case (
groupBy@GroupBy(_, _, _, _, Select(_, _, _, _, _, _, _, _, _, _), _, _, _),
Select(_, _, _, _, _,
Seq(GroupBy(_, _, _, _, Select(_, _, _, _, _, _, _, _, _, _), _, _, _)), _, _, _, _)
) =>
Select(
groupBy.outputList,
groupBy.outputList,
Nil,
Map.empty,
Nil,
groupBy :: Nil,
groupBy.flags,
groupBy.flagSpec,
Seq.empty).setSkip()
case _ => updatedSubsumer.setSkip()
}
}
private def unifySubsumer2(subsumer: ModularPlan, subsumee: ModularPlan): ModularPlan = {
val subsumerTables = subsumer.collect { case node: LeafNode => node }
val subsumeeTables = subsumee.collect { case node: LeafNode => node }
val tableMappings = for {
subsumerTableIndex <- subsumerTables.indices
subsumeeTableIndex <- subsumeeTables.indices
if subsumerTables(subsumerTableIndex) == subsumeeTables(subsumeeTableIndex) &&
isJoinMatched(
subsumer,
subsumee,
subsumerTables(subsumerTableIndex),
subsumeeTables(subsumeeTableIndex),
subsumerTableIndex,
subsumeeTableIndex
)
} yield {
(subsumerTables(subsumerTableIndex), subsumeeTables(subsumeeTableIndex))
}
tableMappings.foldLeft(subsumer) {
case (currentSubsumer, tableMapping) =>
val mappedOperator =
tableMapping._1 match {
case relation: HarmonizedRelation if relation.hasTag =>
tableMapping._2.asInstanceOf[HarmonizedRelation].addTag
case _ =>
tableMapping._2
}
val nextSubsumer = currentSubsumer.transform {
case node: ModularRelation if node.fineEquals(tableMapping._1) => mappedOperator
case tableMapping._1 if !tableMapping._1.isInstanceOf[ModularRelation] => mappedOperator
}
// reverse first due to possible tag for left join
val rewrites = AttributeMap(tableMapping._1.output.zip(mappedOperator.output))
nextSubsumer.transformUp {
case plan => plan.transformExpressions {
case attribute: Attribute if rewrites contains attribute =>
rewrites(attribute).withQualifier(attribute.qualifier)
}
}
}
}
private def updateTimeSeriesFunction(function: ScalaUDF, newName: String): Expression = {
function.transformDown {
case reference: AttributeReference =>
AttributeReference(newName, reference.dataType)(
exprId = reference.exprId,
qualifier = reference.qualifier)
case literal: Literal =>
Literal(UTF8String.fromString("'" + literal.toString() + "'"),
org.apache.spark.sql.types.DataTypes.StringType)
}
}
private def updateDuplicateColumns(outputList: Seq[NamedExpression]): Seq[NamedExpression] = {
val duplicateColNames = outputList.groupBy(_.name).filter(_._2.length > 1).flatMap(_._2).toList
val updatedOutputList = outputList.map {
output =>
val duplicateColumn = duplicateColNames.find(name => name.semanticEquals(output))
if (duplicateColumn.isDefined) {
val attributesOfDuplicateCol = duplicateColumn.get.collect {
case reference: AttributeReference => reference
}
val attributesOfCol = output.collect {
case reference: AttributeReference => reference
}
// here need to check the whether the duplicate columns is of same tables,
// since query with duplicate columns is valid, we need to make sure, not to change their
// names with above defined qualifier name, for example in case of some expression like
// cast((FLOOR((cast(col_name) as double))).., upper layer even exprid will be same,
// we need to find the attribute ref(col_name) at lower level and check where expid is
// same or of same tables, so doin the semantic equals
val qualifiedName =
output.qualifier.headOption.getOrElse(s"${output.exprId.id}_${output.name}")
if (!attributesOfDuplicateCol.forall(attribute =>
attributesOfCol.exists(a => a.semanticEquals(attribute)))) {
Alias(output, qualifiedName)(exprId = output.exprId)
} else if (output.qualifier.nonEmpty) {
Alias(output, qualifiedName)(exprId = output.exprId)
// this check is added in scenario where the column is direct Attribute reference and
// since duplicate columns select is allowed, we should just put alias for those columns
// and update, for this also above isStrictDuplicate will be true so, it will not be
// updated above
} else if (duplicateColumn.get.isInstanceOf[AttributeReference] &&
output.isInstanceOf[AttributeReference]) {
Alias(output, qualifiedName)(exprId = output.exprId)
} else {
output
}
} else {
output
}
}
updatedOutputList
}
/**
* Update the modular plan as per the mv table relation inside it.
*
* @param modularPlan plan to be updated
* @return Updated modular plan.
*/
private def updatePlan(modularPlan: ModularPlan): ModularPlan = {
modularPlan match {
case select: Select if select.modularPlan.isDefined =>
val planWrapper = select.modularPlan.get.asInstanceOf[MVPlanWrapper]
val plan = planWrapper.modularPlan.asInstanceOf[Select]
// when the output list contains multiple projection of same column, but relation
// contains distinct columns, mapping may go wrong with columns, so select distinct
val updatedPlanOutputList = getUpdatedOutputList(plan.outputList, select.modularPlan)
val outputList =
for ((output1, output2) <- select.outputList.distinct zip updatedPlanOutputList) yield {
if (output1.name != output2.name) {
Alias(output2, output1.name)(exprId = output1.exprId)
} else {
output2
}
}
plan.copy(outputList = outputList).setRewritten()
case select: Select => select.children match {
case Seq(groupBy: GroupBy) if groupBy.modularPlan.isDefined =>
val planWrapper = groupBy.modularPlan.get.asInstanceOf[MVPlanWrapper]
val plan = planWrapper.modularPlan.asInstanceOf[Select]
val aliasMap = getAliasMap(plan.outputList, groupBy.outputList)
// Update the flagSpec as per the mv table attributes.
val updatedFlagSpec = updateFlagSpec(select, plan, aliasMap, keepAlias = false)
if (!planWrapper.viewSchema.isRefreshIncremental) {
val updatedPlanOutputList = getUpdatedOutputList(plan.outputList, groupBy.modularPlan)
val outputList =
for ((output1, output2) <- groupBy.outputList zip updatedPlanOutputList) yield {
if (output1.name != output2.name) {
Alias(output2, output1.name)(exprId = output1.exprId)
} else {
output2
}
}
// Directly keep the relation as child.
select.copy(
outputList = select.outputList.map {
output => outputList.find(_.name.equals(output.name)).get
},
children = Seq(plan),
aliasMap = plan.aliasMap,
flagSpec = updatedFlagSpec).setRewritten()
} else {
val child = updatePlan(groupBy).asInstanceOf[Matchable]
// First find the indices from the child output list.
val outputIndices = select.outputList.map {
output =>
groupBy.outputList.indexWhere {
case alias: Alias if output.isInstanceOf[Alias] =>
alias.child.semanticEquals(output.asInstanceOf[Alias].child)
case alias: Alias if alias.child.semanticEquals(output) =>
true
case other if output.isInstanceOf[Alias] =>
other.semanticEquals(output.asInstanceOf[Alias].child)
case other =>
other.semanticEquals(output) || other.toAttribute.semanticEquals(output)
}
}
// Get the outList from converted child output list using already selected indices
val outputList =
outputIndices.map(child.outputList(_)).zip(select.outputList).map {
case (output1, output2) =>
output1 match {
case alias: Alias if output2.isInstanceOf[Alias] =>
Alias(alias.child, output2.name)(exprId = output2.exprId)
case alias: Alias =>
alias
case other if output2.isInstanceOf[Alias] =>
Alias(other, output2.name)(exprId = output2.exprId)
case other =>
other
}
}
// TODO Remove the unnecessary columns from selection.
// Only keep columns which are required by parent.
select.copy(
outputList = outputList,
inputList = child.outputList,
flagSpec = updatedFlagSpec,
children = Seq(child)).setRewritten()
}
case _ => select
}
case groupBy: GroupBy if groupBy.modularPlan.isDefined =>
val planWrapper = groupBy.modularPlan.get.asInstanceOf[MVPlanWrapper]
val plan = planWrapper.modularPlan.asInstanceOf[Select]
val updatedPlanOutputList = getUpdatedOutputList(plan.outputList, groupBy.modularPlan)
val outputListMapping = groupBy.outputList zip updatedPlanOutputList
val outputList = for ((output1, output2) <- outputListMapping) yield {
output1 match {
case Alias(aggregate@AggregateExpression(function@Sum(_), _, _, _), _) =>
val uFun = function.copy(child = output2)
Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
case Alias(aggregate@AggregateExpression(function@Max(_), _, _, _), _) =>
val uFun = function.copy(child = output2)
Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
case Alias(aggregate@AggregateExpression(function@Min(_), _, _, _), _) =>
val uFun = function.copy(child = output2)
Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
case Alias(aggregate@AggregateExpression(_@Count(Seq(_)), _, _, _), _) =>
val uFun = Sum(output2)
Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
case Alias(agg@AggregateExpression(_@Corr(_, _), _, _, _), _) =>
val uFun = Sum(output2)
Alias(agg.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
case Alias(aggregate@AggregateExpression(_@VariancePop(_), _, _, _), _) =>
val uFun = Sum(output2)
Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
case Alias(aggregate@AggregateExpression(_@VarianceSamp(_), _, _, _), _) =>
val uFun = Sum(output2)
Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
case Alias(aggregate@AggregateExpression(_@StddevSamp(_), _, _, _), _) =>
val uFun = Sum(output2)
Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
case Alias(aggregate@AggregateExpression(_@StddevPop(_), _, _, _), _) =>
val uFun = Sum(output2)
Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
case Alias(aggregate@AggregateExpression(_@CovPopulation(_, _), _, _, _), _) =>
val uFun = Sum(output2)
Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
case Alias(aggregate@AggregateExpression(_@CovSample(_, _), _, _, _), _) =>
val uFun = Sum(output2)
Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
case Alias(aggregate@AggregateExpression(_@Skewness(_), _, _, _), _) =>
val uFun = Sum(output2)
Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
case Alias(aggregate@AggregateExpression(_@Kurtosis(_), _, _, _), _) =>
val uFun = Sum(output2)
Alias(aggregate.copy(aggregateFunction = uFun), output1.name)(exprId = output1.exprId)
case _ =>
if (output1.name != output2.name) {
Alias(output2, output1.name)(exprId = output1.exprId)
} else {
output2
}
}
}
val updatedPredicates = groupBy.predicateList.map {
predicate =>
outputListMapping.find {
case (output1, _) =>
output1 match {
case alias: Alias if predicate.isInstanceOf[Alias] =>
alias.child.semanticEquals(predicate.children.head)
case alias: Alias =>
alias.child.semanticEquals(predicate)
case other =>
other.semanticEquals(predicate)
}
} match {
case Some((_, output2)) => output2
case _ => predicate
}
}
groupBy.copy(
outputList = outputList,
inputList = plan.outputList,
predicateList = updatedPredicates,
child = plan,
modularPlan = None).setRewritten()
case other => other
}
}
/**
* Updates the flagspec of given select plan with attributes of relation select plan
*/
private def updateFlagSpec(
select: Select,
relation: Select,
aliasMap: Map[AttributeKey, NamedExpression],
keepAlias: Boolean): Seq[Seq[Any]] = {
val updatedFlagSpec = select.flagSpec.map {
flagSpec =>
flagSpec.map {
case list: ArrayBuffer[_] =>
list.map {
case sortOrder: SortOrder =>
val expressions =
updateOutputList(
Seq(sortOrder.child.asInstanceOf[Attribute]),
relation,
aliasMap,
keepAlias = false)
SortOrder(expressions.head, sortOrder.direction)
}
// In case of limit it goes to other.
case other => other
}
}
updatedFlagSpec
}
private def updateOutputList(
outputList: Seq[NamedExpression],
select: Select,
aliasMap: Map[AttributeKey, NamedExpression],
keepAlias: Boolean): Seq[NamedExpression] = {
val updatedOutputList = updateSubsumeAttributes(
outputList,
aliasMap,
Some(select.aliasMap.values.head),
keepAlias).asInstanceOf[Seq[NamedExpression]]
updatedOutputList.zip(outputList).map {
case (updatedOutput, output) =>
updatedOutput match {
case reference: AttributeReference =>
Alias(reference, output.name)(output.exprId)
case Alias(reference: AttributeReference, _) =>
Alias(reference, output.name)(output.exprId)
case other => other
}
}
}
/**
* Updates the expressions as per the subsumer output expressions. It is needed to update the
* expressions as per the mv table relation
*
* @param expressions expressions which are needed to update
* @param aliasName table alias name
* @return Updated expressions
*/
private def updateSubsumeAttributes(
expressions: Seq[Expression],
attributeMap: Map[AttributeKey, NamedExpression],
aliasName: Option[String],
keepAlias: Boolean = false): Seq[Expression] = {
def getAttribute(expression: Expression) = {
expression match {
case Alias(aggregate: AggregateExpression, _) =>
aggregate.aggregateFunction.collect {
case reference: AttributeReference =>
ExpressionHelper.createReference(reference.name,
reference.dataType,
reference.nullable,
reference.metadata,
reference.exprId,
aliasName)
}.head
case Alias(child, _) => child
case other => other
}
}
expressions.map {
case alias@Alias(agg: AggregateExpression, name) =>
attributeMap.get(AttributeKey(agg)).map {
expression =>
Alias(getAttribute(expression), name)(
alias.exprId,
alias.qualifier,
alias.explicitMetadata)
}.getOrElse(alias)
case reference: AttributeReference =>
attributeMap.get(AttributeKey(reference)).map {
expression =>
if (keepAlias) {
AttributeReference(
name = expression.name,
dataType = expression.dataType,
nullable = expression.nullable,
metadata = expression.metadata)(
exprId = expression.exprId,
qualifier = reference.qualifier)
} else {
expression
}
}.getOrElse(reference)
case alias@Alias(expression: Expression, name) =>
attributeMap.get(AttributeKey(expression)).map {
expression =>
Alias(getAttribute(expression), name)(alias.exprId, alias.qualifier,
alias.explicitMetadata)
}.getOrElse(alias)
case expression: Expression =>
attributeMap.getOrElse(AttributeKey(expression), expression)
}
}
private val subqueryNameGenerator: SubqueryNameGenerator = new SubqueryNameGenerator()
private val rewrittenPlans: java.util.Set[ModularPlan] = new java.util.HashSet[ModularPlan]()
lazy val optimizedPlan: LogicalPlan = BirdcageOptimizer.execute(logicalPlan)
lazy val modularPlan: ModularPlan = SimpleModularizer.modularize(optimizedPlan).next()
lazy val harmonizedPlan: ModularPlan = modularPlan.harmonized
lazy val rewrittenPlan: ModularPlan = rewrite(rewriteWithSchemaWrapper(harmonizedPlan))
lazy val toCompactSQL: String = rewriteWithSchemaWrapper(modularPlan).asCompactSQL
lazy val toOneLineSQL: String = rewriteWithSchemaWrapper(modularPlan).asOneLineSQL
private case class AttributeKey(expression: Expression) {
override def equals(other: Any): Boolean = {
other match {
case that: AttributeKey =>
expression.semanticEquals(that.expression)
case _ => false
}
}
// Basically we want to use it as simple linked list so hashcode is hardcoded.
override def hashCode: Int = {
1
}
}
}
private class SubqueryNameGenerator {
private val nextSubqueryId = new AtomicLong(0)
def newSubsumerName(): String = s"gen_subsumer_${nextSubqueryId.getAndIncrement()}"
}