| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| |
| package org.apache.carbondata.mv.rewrite |
| |
| import org.apache.spark.sql.CarbonToSparkAdapter |
| import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Expression, PredicateHelper, _} |
| import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression |
| import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} |
| import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter} |
| import org.apache.spark.sql.types.{DataType, Metadata} |
| |
| import org.apache.carbondata.mv.datamap.MVHelper |
| import org.apache.carbondata.mv.plans.modular.{JoinEdge, Matchable, ModularPlan, _} |
| import org.apache.carbondata.mv.plans.modular |
| import org.apache.carbondata.mv.plans.modular.Flags._ |
| import org.apache.carbondata.mv.plans.util.SQLBuilder |
| |
| |
| abstract class DefaultMatchMaker extends MatchMaker[ModularPlan] |
| |
| abstract class DefaultMatchPattern extends MatchPattern[ModularPlan] { |
| |
| /** Name for this pattern, automatically inferred based on class name. */ |
| val patternName: String = { |
| val className = getClass.getName |
| if (className endsWith "$") className.dropRight(1) else className |
| } |
| |
| def factorOutSubsumer( |
| compensation: ModularPlan, |
| subsumer: Matchable, |
| aliasMapMain: Map[Int, String]): ModularPlan = { |
| |
| // Create aliasMap with attribute to alias reference attribute |
| val aliasMap = AttributeMap( |
| subsumer.outputList.collect { |
| case a: Alias if a.child.isInstanceOf[Attribute] => |
| (a.child.asInstanceOf[Attribute], a.toAttribute) |
| }) |
| |
| // Create aliasMap with Expression to alias reference attribute |
| val aliasMapExp = |
| subsumer.outputList.collect { |
| case a: Alias if a.child.isInstanceOf[Expression] && |
| !a.child.isInstanceOf[AggregateExpression] => |
| a.child -> a.toAttribute |
| }.toMap |
| |
| // Check and replace all alias references with subsumer alias map references. |
| val compensation1 = compensation.transform { |
| case plan if !plan.skip && plan != subsumer => |
| plan.transformExpressions { |
| case a: AttributeReference => |
| aliasMap |
| .get(a) |
| .map { ref => |
| AttributeReference( |
| ref.name, ref.dataType)( |
| exprId = ref.exprId, |
| qualifier = a.qualifier) |
| }.getOrElse(a) |
| case a: Expression => |
| aliasMapExp |
| .get(a) |
| .map { ref => |
| AttributeReference( |
| ref.name, ref.dataType)( |
| exprId = ref.exprId) |
| }.getOrElse(a) |
| } |
| } |
| |
| val subqueryAttributeSet = SQLBuilder.collectAttributeSet(subsumer.outputList) |
| if (SQLBuilder.collectDuplicateNames(subqueryAttributeSet).nonEmpty) { |
| new UnsupportedOperationException( |
| s"duplicate name(s): ${ subsumer.output.map(_.toString + ", ") }") |
| } |
| if (aliasMapMain.size == 1) { |
| val subsumerName: Option[String] = aliasMapMain.get(0) |
| // Replace all compensation1 attributes with refrences of subsumer attributeset |
| val compensationFinal = compensation1.transformExpressions { |
| case ref: Attribute if subqueryAttributeSet.contains(ref) => |
| CarbonToSparkAdapter.createAttributeReference( |
| ref.name, ref.dataType, nullable = true, metadata = Metadata.empty, |
| exprId = ref.exprId, qualifier = subsumerName) |
| case alias: Alias if subqueryAttributeSet.contains(alias.toAttribute) => |
| CarbonToSparkAdapter.createAliasRef( |
| alias.child, alias.name, alias.exprId, subsumerName) |
| } |
| compensationFinal |
| } else { |
| compensation1 |
| } |
| } |
| } |
| |
| object DefaultMatchMaker extends DefaultMatchMaker { |
| lazy val patterns = |
| SelectSelectNoChildDelta :: |
| GroupbyGroupbyNoChildDelta :: |
| GroupbyGroupbySelectOnlyChildDelta :: |
| GroupbyGroupbyGroupbyChildDelta :: |
| SelectSelectSelectChildDelta :: |
| SelectSelectGroupbyChildDelta :: Nil |
| } |
| |
| /** |
| * Convention: |
| * EmR: each subsumee's expression match some of subsumer's expression |
| * EdR: each subsumee's expression derive from some of subsumer's expression |
| * RmE: each subsumer's expression match some of subsumee's expression |
| * RdE: each subsumer's expression derive from some of subsumee's expression |
| */ |
| |
| object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper { |
| private def isDerivable( |
| exprE: Expression, |
| exprListR: Seq[Expression], |
| subsumee: ModularPlan, |
| subsumer: ModularPlan, |
| compensation: Option[ModularPlan]): Boolean = { |
| if (subsumee.asInstanceOf[Select].predicateList.contains(exprE)) { |
| subsumer.asInstanceOf[Select].predicateList.exists(_.semanticEquals(exprE)) || |
| canEvaluate(exprE, subsumer) |
| } else if (subsumee.asInstanceOf[Select].outputList.contains(exprE)) { |
| exprE match { |
| case a@Alias(_, _) => |
| exprListR.exists(a1 => a1.isInstanceOf[Alias] && |
| a1.asInstanceOf[Alias].child.semanticEquals(a.child)) || |
| exprListR.exists(_.semanticEquals(exprE) || canEvaluate(exprE, subsumer)) |
| case exp => |
| exprListR.exists(a1 => a1.isInstanceOf[Alias] && |
| a1.asInstanceOf[Alias].child.semanticEquals(exp)) || |
| exprListR.exists(_.semanticEquals(exprE) || canEvaluate(exprE, subsumer)) |
| } |
| } else { |
| false |
| } |
| } |
| |
| private def isLeftJoinView(subsumer: ModularPlan): Boolean = { |
| subsumer match { |
| case sel: Select |
| if sel.joinEdges.length == 1 && |
| sel.joinEdges.head.joinType == LeftOuter && |
| sel.children(1).isInstanceOf[HarmonizedRelation] => |
| val hDim = sel.children(1).asInstanceOf[HarmonizedRelation] |
| hDim.tag match { |
| case Some(tag) => sel.outputList.contains(tag) |
| case None => false |
| } |
| case _ => false |
| } |
| } |
| |
| def apply(subsumer: ModularPlan, |
| subsumee: ModularPlan, |
| compensation: Option[ModularPlan], |
| rewrite: QueryRewrite): Seq[ModularPlan] = { |
| |
| (subsumer, subsumee, compensation) match { |
| case ( |
| sel_1a @ modular.Select(_, _, _, _, _, _, _, _, _, _), |
| sel_1q @ modular.Select(_, _, _, _, _, _, _, _, _, _), None |
| ) if sel_1a.children.forall { _.isInstanceOf[modular.LeafNode] } && |
| sel_1q.children.forall { _.isInstanceOf[modular.LeafNode] } => |
| |
| // assume children (including harmonized relation) of subsumer and subsumee |
| // are 1-1 correspondence. |
| // Change the following two conditions to more complicated ones if we want to |
| // consider things that combine extrajoin, rejoin, and harmonized relations |
| val isUniqueRmE = subsumer.children.filter { x => subsumee.children.count{ |
| case relation: ModularRelation => relation.fineEquals(x) |
| case other => other == x |
| } != 1 } |
| val isUniqueEmR = subsumee.children.filter { x => subsumer.children.count{ |
| case relation: ModularRelation => relation.fineEquals(x) |
| case other => other == x |
| } != 1 } |
| |
| val extrajoin = sel_1a.children.filterNot { child => sel_1q.children.contains(child) } |
| val rejoin = sel_1q.children.filterNot { child => sel_1a.children.contains(child) } |
| val rejoinOutputList = rejoin.flatMap(_.output) |
| |
| val isPredicateRmE = sel_1a.predicateList.forall(expr => |
| sel_1q.predicateList.exists(_.semanticEquals(expr))) |
| val isPredicateEmdR = sel_1q.predicateList.forall(expr => |
| isDerivable(expr, sel_1a.outputList ++ rejoinOutputList, sel_1q, sel_1a, None)) |
| // Check if sel_1q.outputList is non empty and then check whether |
| // it can be derivable with sel_1a otherwise for empty cases it returns true. |
| val isOutputEdR = sel_1q.outputList.nonEmpty && sel_1q.outputList.forall(expr => |
| isDerivable(expr, sel_1a.outputList ++ rejoinOutputList, sel_1q, sel_1a, None)) |
| |
| if (isUniqueRmE.isEmpty && isUniqueEmR.isEmpty && extrajoin.isEmpty && isPredicateRmE && |
| isPredicateEmdR && isOutputEdR) { |
| val mappings = sel_1a.children.zipWithIndex.map { |
| case (childr, fromIdx) if sel_1q.children.contains(childr) => |
| val toIndx = sel_1q.children.indexWhere{ |
| case relation: ModularRelation => relation.fineEquals(childr) |
| case other => other == childr |
| } |
| (toIndx -> fromIdx) |
| |
| } |
| val e2r = mappings.toMap |
| val r2e = e2r.map(_.swap) |
| val r2eJoinsMatch = sel_1a.joinEdges.forall { x => |
| (r2e.get(x.left), r2e.get(x.right)) match { |
| case (Some(l), Some(r)) => |
| val mappedEdge = JoinEdge(l, r, x.joinType) |
| val joinTypeEquivalent = |
| if (sel_1q.joinEdges.contains(mappedEdge)) { |
| true |
| } else { |
| x.joinType match { |
| case Inner | FullOuter => |
| sel_1q.joinEdges.contains(JoinEdge(r, l, x.joinType)) |
| case LeftOuter if isLeftJoinView(sel_1a) => |
| sel_1q.joinEdges.contains(JoinEdge(l, r, Inner)) || |
| sel_1q.joinEdges.contains(JoinEdge(r, l, Inner)) |
| case _ => false |
| } |
| } |
| if (joinTypeEquivalent) { |
| val sel_1a_join = sel_1a.extractJoinConditions( |
| sel_1a.children(x.left), |
| sel_1a.children(x.right)) |
| val sel_1q_join = sel_1q.extractJoinConditions( |
| sel_1q.children(mappedEdge.left), |
| sel_1q.children(mappedEdge.right)) |
| sel_1a_join.forall(e => sel_1q_join.exists(e.semanticEquals(_))) && |
| sel_1q_join.forall(e => sel_1a_join.exists(e.semanticEquals(_))) |
| } else false |
| case _ => false |
| } |
| } |
| |
| val isPredicateEmR = sel_1q.predicateList.forall(expr => |
| sel_1a.predicateList.exists(_.semanticEquals(expr))) |
| val isOutputEmR = sel_1q.outputList.forall(expr => |
| sel_1a.outputList.exists(_.semanticEquals(expr))) |
| val isOutputRmE = sel_1a.outputList.forall(expr => |
| sel_1q.outputList.exists(_.semanticEquals(expr))) |
| val isLOEmLOR = !(isLeftJoinView(sel_1a) && sel_1q.joinEdges.head.joinType == Inner) |
| |
| if (r2eJoinsMatch) { |
| if (isPredicateEmR && isOutputEmR && isOutputRmE && rejoin.isEmpty && isLOEmLOR) { |
| Seq(sel_1a) |
| } else { |
| // no compensation needed |
| val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]() |
| val tAliasMap = new collection.mutable.HashMap[Int, String]() |
| |
| val usel_1a = sel_1a.copy(outputList = sel_1a.outputList) |
| tChildren += usel_1a |
| tAliasMap += (tChildren.indexOf(usel_1a) -> rewrite.newSubsumerName()) |
| |
| sel_1q.children.zipWithIndex.foreach { |
| case (childe, idx) => |
| if (e2r.get(idx).isEmpty) { |
| tChildren += childe |
| sel_1q.aliasMap.get(idx).map(x => tAliasMap += (tChildren.indexOf(childe) -> x)) |
| } |
| } |
| |
| val tJoinEdges = sel_1q.joinEdges.collect { |
| case JoinEdge(le, re, joinType) => |
| (e2r.get(le), e2r.get(re)) match { |
| case (Some(lr), None) => |
| JoinEdge( |
| 0, |
| tChildren.indexOf(sel_1q.children(re)), |
| joinType) |
| case (None, None) => |
| JoinEdge( |
| tChildren.indexOf(sel_1q.children(le)), |
| tChildren.indexOf(sel_1q.children(re)), |
| joinType) |
| case (None, Some(rr)) => |
| JoinEdge( |
| tChildren.indexOf(sel_1q.children(le)), |
| 0, |
| joinType) |
| case _ => |
| null.asInstanceOf[JoinEdge] |
| } |
| } |
| val tPredicateList = sel_1q.predicateList.filter { p => |
| !sel_1a.predicateList.exists(_.semanticEquals(p)) |
| } ++ (if (isLeftJoinView(sel_1a) && |
| sel_1q.joinEdges.head.joinType == Inner) { |
| sel_1a.children(1) |
| .asInstanceOf[HarmonizedRelation].tag.map(IsNotNull(_)).toSeq |
| } else { |
| Seq.empty |
| }) |
| val sel_1q_temp = sel_1q.copy( |
| predicateList = tPredicateList, |
| children = tChildren, |
| joinEdges = tJoinEdges.filter(_ != null), |
| aliasMap = tAliasMap.toMap) |
| |
| val done = factorOutSubsumer(sel_1q_temp, usel_1a, sel_1q_temp.aliasMap) |
| Seq(done) |
| } |
| } else Nil |
| } else Nil |
| |
| case ( |
| sel_3a @ modular.Select(_, _, _, _, _, _, _, _, _, _), |
| sel_3q @ modular.Select(_, _, _, _, _, _, _, _, _, _), None) |
| if sel_3a.children.forall(_.isInstanceOf[GroupBy]) && |
| sel_3q.children.forall(_.isInstanceOf[GroupBy]) => |
| val isPredicateRmE = sel_3a.predicateList.isEmpty || |
| sel_3a.predicateList.forall(expr => |
| sel_3q.predicateList.exists(_.semanticEquals(expr))) |
| val isPredicateEmdR = sel_3q.predicateList.isEmpty || |
| sel_3q.predicateList.forall(expr => |
| sel_3a.predicateList.exists(_.semanticEquals(expr)) || |
| isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None)) |
| val isOutputEdR = sel_3q.outputList.forall(expr => |
| isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None)) |
| val isSingleChild = sel_3a.children.length == 1 && sel_3q.children.length == 1 |
| |
| if (isPredicateRmE && isPredicateEmdR && isOutputEdR && isSingleChild) { |
| val isPredicateEmR = sel_3q.predicateList.isEmpty || |
| sel_3q.predicateList.forall(expr => |
| sel_3a.predicateList.exists(_.semanticEquals(expr))) |
| val isOutputRmE = sel_3a.outputList.forall(expr => |
| isDerivable(expr, sel_3q.outputList, sel_3a, sel_3q, None)) |
| val isOutputEmR = sel_3q.outputList.forall(expr => |
| isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None)) |
| |
| if (isPredicateEmR && isOutputEmR && isOutputRmE) { |
| Seq(sel_3a) |
| } else if (isPredicateEmR && isOutputEmR) { |
| // no compensation needed |
| val sel_3q_exp = sel_3q.transformExpressions({ |
| case a: Alias => sel_3a.outputList |
| .find { a1 => |
| a1.isInstanceOf[Alias] && |
| a1.asInstanceOf[Alias].child.semanticEquals(a.child) |
| }.map(_.toAttribute).get |
| }) |
| val wip = sel_3q_exp.copy( |
| children = Seq(sel_3a), |
| aliasMap = Seq(0 -> rewrite.newSubsumerName()).toMap) |
| val done = factorOutSubsumer(wip, sel_3a, wip.aliasMap) |
| Seq(done) |
| } else { |
| Nil |
| } |
| } else Nil |
| |
| case _ => Nil |
| } |
| } |
| |
| } |
| |
| object GroupbyGroupbyNoChildDelta extends DefaultMatchPattern { |
| def apply( |
| subsumer: ModularPlan, |
| subsumee: ModularPlan, |
| compensation: Option[ModularPlan], |
| rewrite: QueryRewrite): Seq[ModularPlan] = { |
| (subsumer, subsumee, compensation) match { |
| case ( |
| gb_2a @ modular.GroupBy(_, _, _, _, _, _, _, _), |
| gb_2q @ modular.GroupBy(_, _, _, _, _, _, _, _), |
| None) => |
| val isGroupingEmR = gb_2q.predicateList.forall(expr => |
| gb_2a.predicateList.exists(_.semanticEquals(expr))) |
| val isGroupingRmE = gb_2a.predicateList.forall(expr => |
| gb_2q.predicateList.exists(_.semanticEquals(expr))) |
| val isOutputEmR = gb_2q.outputList.forall { |
| case a @ Alias(_, _) => |
| gb_2a.outputList.exists{ |
| case a1: Alias => a1.child.semanticEquals(a.child) |
| case exp => exp.semanticEquals(a.child) |
| } |
| case exp => gb_2a.outputList.exists(_.semanticEquals(exp)) |
| } |
| if (isGroupingEmR && isGroupingRmE) { |
| if (isOutputEmR) { |
| // Mappings of output of two plans by checking semantic equals. |
| val mappings = gb_2a.outputList.zipWithIndex.map { case(exp, index) => |
| (exp, gb_2q.outputList.find { |
| case a: Alias if exp.isInstanceOf[Alias] => |
| a.child.semanticEquals(exp.children.head) |
| case a: Alias => a.child.semanticEquals(exp) |
| case other => other.semanticEquals(exp) |
| }.getOrElse(gb_2a.outputList(index))) |
| } |
| |
| val oList = mappings.map{case (out1, out2) => |
| if (out1.name != out2.name) out1 match { |
| case alias: Alias => Alias(alias.child, out2.name)(exprId = alias.exprId) |
| case _ => Alias(out1, out2.name)(exprId = out2.exprId) |
| } else out1 |
| } |
| |
| Seq(gb_2a.copy(outputList = oList)) |
| } else { |
| Nil |
| } |
| } else { |
| val aliasMap = AttributeMap(gb_2a.outputList.collect { case a: Alias => |
| (a.toAttribute, a)}) |
| if (isGroupingEmR) { |
| Utils.tryMatch( |
| gb_2a, gb_2q, aliasMap).flatMap { |
| case g: GroupBy => |
| // Check any agg function exists on outputlist, in case of expressions like |
| // sum(a), then create new alias and copy to group by node |
| val aggFunExists = g.outputList.exists { f => |
| f.find { |
| case _: AggregateExpression => true |
| case _ => false |
| }.isDefined |
| } |
| if (aggFunExists && !isGroupingRmE && isOutputEmR) { |
| val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]() |
| val sel_1a = g.child.asInstanceOf[Select] |
| |
| val usel_1a = sel_1a.copy(outputList = sel_1a.outputList) |
| tChildren += gb_2a |
| val sel_1q_temp = sel_1a.copy( |
| predicateList = sel_1a.predicateList, |
| children = tChildren, |
| joinEdges = sel_1a.joinEdges, |
| aliasMap = Seq(0 -> rewrite.newSubsumerName()).toMap) |
| |
| val res = factorOutSubsumer(sel_1q_temp, usel_1a, sel_1q_temp.aliasMap) |
| Some(g.copy(child = res)) |
| } else { |
| Some(g.copy(child = g.child.withNewChildren( |
| g.child.children.map { |
| case modular.Select(_, _, _, _, _, _, _, _, _, _) => gb_2a; |
| case other => other |
| }))); |
| } |
| case _ => None}.map(Seq(_)).getOrElse(Nil) |
| } else { |
| Nil |
| } |
| } |
| |
| case _ => Nil |
| } |
| } |
| } |
| |
| object GroupbyGroupbySelectOnlyChildDelta extends DefaultMatchPattern with PredicateHelper { |
| private def isDerivable( |
| exprE: Expression, |
| exprListR: Seq[Expression], |
| subsumee: ModularPlan, |
| subsumer: ModularPlan, |
| compensation: Option[ModularPlan]) = { |
| if (subsumee.asInstanceOf[GroupBy].predicateList.contains(exprE)) { |
| if (exprListR.exists(_.semanticEquals(exprE)) || canEvaluate(exprE, exprListR) || |
| isDerivableForUDF(exprE, exprListR)) { |
| true |
| } else { |
| false |
| } |
| } else if (compensation.getOrElse(throw new RuntimeException("compensation cannot be None")) |
| .asInstanceOf[Select].predicateList.contains(exprE)) { |
| if (canEvaluate(exprE, exprListR) || exprListR.exists(_.semanticEquals(exprE)) || |
| isDerivableForUDF(exprE, exprListR)) { |
| true |
| } else { |
| false |
| } |
| } else { |
| false |
| } |
| } |
| |
| /** |
| * org.apache.carbondata.mv.plans.MorePredicateHelper#canEvaluate will be checking the |
| * exprE.references as subset of AttibuteSet(exprListR), which will just take the column name from |
| * UDF, so it will be always false for ScalaUDF. |
| * This method takes care of checking whether the exprE references can be derived form list |
| * |
| * Example: |
| * exprE has attribute reference for timeseries column like UDF:timeseries(projectjoindate, month) |
| * So exprE.references will give UDF:timeseries(projectjoindate, month) |
| * exprListR has ScalaUDF also, so AttributeSet(exprListR) contains just column name like |
| * projectjoindate, so canEvaluate method returns false. |
| * |
| * Here checking whether the exprListR(ScalaUDF) .sql gives UDF:timeseries(projectjoindate, month) |
| * which can be checked with exprE.references |
| */ |
| private def isDerivableForUDF(exprE: Expression, exprListR: Seq[Expression]): Boolean = { |
| var canBeDerived = false |
| exprListR.forall { |
| case a: ScalaUDF => |
| a.references.foreach { a => |
| canBeDerived = exprE.sql.contains(a.name) |
| } |
| canBeDerived |
| case _ => |
| canBeDerived |
| } |
| } |
| |
| def apply( |
| subsumer: ModularPlan, |
| subsumee: ModularPlan, |
| compensation: Option[ModularPlan], |
| rewrite: QueryRewrite): Seq[ModularPlan] = { |
| val aggInputEinR = subsumee.expressions |
| .collect { case agg: aggregate.AggregateExpression => AttributeSet(Seq(agg)) |
| .subsetOf(subsumer.outputSet) |
| }.forall(identity) |
| val compensationSelectOnly = !compensation.map { _.collect { case n => n.getClass } } |
| .exists(_.contains(modular.GroupBy)) |
| |
| (subsumer, subsumee, compensation, aggInputEinR, compensationSelectOnly) match { |
| case ( |
| gb_2a @ modular.GroupBy(_, _, _, _, _, _, _, _), |
| gb_2q @ modular.GroupBy(_, _, _, _, _, _, _, _), |
| Some(sel_1c1 @ modular.Select(_, _, _, _, _, _, _, _, _, _)), |
| true, |
| true) |
| if !gb_2q.flags.hasFlag(EXPAND) && !gb_2a.flags.hasFlag(EXPAND) => |
| |
| val rejoinOutputList = sel_1c1.children.tail.flatMap(_.output) |
| val isGroupingEdR = gb_2q.predicateList.forall(expr => |
| isDerivable(expr, gb_2a.predicateList ++ rejoinOutputList, gb_2q, gb_2a, compensation)) |
| val needRegrouping = !gb_2a.predicateList.forall(gb_2q.predicateList.contains) |
| val canPullup = sel_1c1.predicateList.forall(expr => |
| isDerivable(expr, gb_2a.predicateList ++ rejoinOutputList, gb_2q, gb_2a, compensation)) |
| val isAggEmR = gb_2q.outputList.collect { |
| case agg: aggregate.AggregateExpression => |
| gb_2a.outputList.exists(_.semanticEquals(agg)) |
| }.forall(identity) |
| |
| if (isGroupingEdR && ((!needRegrouping && isAggEmR) || needRegrouping) && canPullup) { |
| // pull up |
| val pullupOutputList = gb_2a.outputList.map(_.toAttribute) ++ rejoinOutputList |
| val myOutputList = gb_2a.outputList.filter { |
| case alias: Alias => gb_2q.outputList.filter(_.isInstanceOf[Alias]) |
| .exists(_.asInstanceOf[Alias].child.semanticEquals(alias.child)) |
| case attr: Attribute => gb_2q.outputList.exists(_.semanticEquals(attr)) |
| }.map(_.toAttribute) ++ rejoinOutputList |
| // TODO: find out if we really need to check needRegrouping or just use myOutputList |
| val sel_2c1 = if (needRegrouping) { |
| sel_1c1 |
| .copy(outputList = pullupOutputList, |
| inputList = pullupOutputList, |
| children = sel_1c1.children |
| .map { _ match { case s: modular.Select => gb_2a; case other => other } }) |
| } else { |
| sel_1c1 |
| .copy(outputList = myOutputList, |
| inputList = pullupOutputList, |
| children = sel_1c1.children |
| .map { _ match { case s: modular.Select => gb_2a; case other => other } }) |
| } |
| // sel_1c1.copy(outputList = pullupOutputList, inputList = pullupOutputList, children = |
| // sel_1c1.children.map { _ match { case s: modular.Select => gb_2a; case other => |
| // other } }) |
| |
| if (rejoinOutputList.isEmpty) { |
| val aliasMap = AttributeMap(gb_2a.outputList.collect { |
| case a: Alias => (a.toAttribute, a) |
| }) |
| val res = |
| Utils.tryMatch(gb_2a, gb_2q, aliasMap).flatMap { |
| case g: GroupBy => |
| |
| // Check any agg function exists on outputlist, in case of expressions like |
| // sum(a)+sum(b) , outputlist directly replaces with alias with in place of function |
| // so we should remove the groupby clause in those cases. |
| val aggFunExists = g.outputList.exists { f => |
| f.find { |
| case ag: AggregateExpression => true |
| case _ => false |
| }.isDefined |
| } |
| if (aggFunExists) { |
| Some(g.copy(child = sel_2c1)) |
| } else { |
| // Remove group by clause. |
| Some(g.copy(child = sel_2c1, predicateList = Seq.empty)) |
| } |
| case _ => None |
| }.map { wip => |
| factorOutSubsumer(wip, gb_2a, sel_1c1.aliasMap) |
| }.map(Seq(_)) |
| .getOrElse(Nil) |
| res |
| } |
| // TODO: implement regrouping with 1:N rejoin (rejoin tables being the "1" side) |
| // via catalog service |
| else if (!needRegrouping && isAggEmR) { |
| Seq(sel_2c1).map(wip => factorOutSubsumer(wip, gb_2a, sel_1c1.aliasMap)) |
| } else Nil |
| } else Nil |
| |
| case _ => Nil |
| } |
| } |
| } |
| |
| object GroupbyGroupbyGroupbyChildDelta extends DefaultMatchPattern { |
| def apply( |
| subsumer: ModularPlan, |
| subsumee: ModularPlan, |
| compensation: Option[ModularPlan], |
| rewrite: QueryRewrite): Seq[ModularPlan] = { |
| val groupbys = compensation.map { _.collect { case g: GroupBy => g } }.getOrElse(Nil).toSet |
| |
| (subsumer, subsumee, groupbys.nonEmpty) match { |
| case ( |
| modular.Select(_, _, _, _, _, _, _, _, _, _), |
| modular.Select(_, _, _, _, _, _, _, _, _, _), |
| true) => |
| // TODO: implement me |
| Nil |
| |
| case _ => Nil |
| } |
| } |
| } |
| |
| |
| object SelectSelectSelectChildDelta extends DefaultMatchPattern { |
| def apply( |
| subsumer: ModularPlan, |
| subsumee: ModularPlan, |
| compensation: Option[ModularPlan], |
| rewrite: QueryRewrite): Seq[ModularPlan] = { |
| val compensationSelectOnly = |
| !compensation |
| .map { _.collect { case n => n.getClass } } |
| .exists(_.contains(modular.GroupBy)) |
| |
| (subsumer, subsumee, compensationSelectOnly) match { |
| case ( |
| modular.Select(_, _, _, _, _, _, _, _, _, _), |
| modular.Select(_, _, _, _, _, _, _, _, _, _), |
| true) => |
| // TODO: implement me |
| Nil |
| case _ => Nil |
| } |
| } |
| } |
| |
| object SelectSelectGroupbyChildDelta extends DefaultMatchPattern with PredicateHelper { |
| private def isDerivable( |
| exprE: Expression, |
| exprListR: Seq[Expression], |
| subsumee: ModularPlan, |
| subsumer: ModularPlan, |
| compensation: Option[ModularPlan]) = { |
| Utils.isDerivable( |
| exprE: Expression, |
| exprListR: Seq[Expression], |
| subsumee: ModularPlan, |
| subsumer: ModularPlan, |
| compensation: Option[ModularPlan]) |
| } |
| |
| def apply( |
| subsumer: ModularPlan, |
| subsumee: ModularPlan, |
| compensation: Option[ModularPlan], |
| rewrite: QueryRewrite): Seq[ModularPlan] = { |
| (subsumer, subsumee, compensation, subsumer.children, subsumee.children) match { |
| case ( |
| sel_3a@modular.Select( |
| _, _, Nil, _, _, |
| Seq(gb_2a@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _), |
| sel_3q@modular.Select( |
| _, _, _, _, _, |
| Seq(gb_2q@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _), |
| Some(gb_2c@modular.GroupBy(_, _, _, _, _, _, _, _)), |
| rchild :: Nil, |
| echild :: Nil) => |
| val tbls_sel_3a = sel_3a.collect { case tbl: modular.LeafNode => tbl } |
| val tbls_sel_3q = sel_3q.collect { case tbl: modular.LeafNode => tbl } |
| |
| val extrajoin = tbls_sel_3a.filterNot(tbls_sel_3q.contains) |
| val rejoin = tbls_sel_3q.filterNot(tbls_sel_3a.contains) |
| val rejoinOutputList = rejoin.flatMap(_.output) |
| |
| val isPredicateRmE = sel_3a.predicateList.forall(expr => |
| sel_3q.predicateList.exists(_.semanticEquals(expr)) || |
| gb_2c.predicateList.exists(_.semanticEquals(expr))) |
| val isPredicateEmdR = sel_3q.predicateList |
| .forall(expr => |
| sel_3a.predicateList.exists(_.semanticEquals(expr)) || |
| isDerivable( |
| expr, |
| sel_3a.outputList ++ rejoinOutputList, |
| sel_3q, |
| sel_3a, |
| compensation)) |
| val isOutputEdR = sel_3q.outputList |
| .forall(expr => |
| isDerivable( |
| expr, |
| sel_3a.outputList ++ rejoinOutputList, |
| sel_3q, |
| sel_3a, |
| compensation)) |
| |
| val canSELPullup = gb_2c.child.isInstanceOf[Select] && |
| gb_2c.child.asInstanceOf[Select].predicateList |
| .forall(expr => |
| isDerivable( |
| expr, |
| sel_3a.outputList ++ rejoinOutputList, |
| sel_3q, |
| sel_3a, |
| compensation)) |
| val canGBPullup = gb_2c.predicateList |
| .forall(expr => |
| isDerivable( |
| expr, |
| sel_3a.outputList ++ rejoinOutputList, |
| sel_3q, |
| sel_3a, |
| compensation)) |
| |
| if (extrajoin.isEmpty && isPredicateRmE && |
| isPredicateEmdR && |
| isOutputEdR && |
| canSELPullup && |
| canGBPullup) { |
| gb_2c.child match { |
| case s: Select => |
| val sel_3c1 = s.withNewChildren( |
| s.children.map { |
| case gb: GroupBy => sel_3a.setSkip() |
| case other => other }) |
| val gb_3c2 = gb_2c.copy(child = sel_3c1) |
| |
| val aliasMap_exp = AttributeMap( |
| gb_2c.outputList.collect { |
| case a: Alias => (a.toAttribute, AliasWrapper(a)) }) |
| val sel_3q_exp = sel_3q.transformExpressions({ |
| case attr: Attribute if aliasMap_exp.contains(attr) => aliasMap_exp(attr) |
| }).transformExpressions { |
| case AliasWrapper(alias: Alias) => alias |
| } |
| // Mappings of output of two plans by checking semantic equals. |
| val mappings = sel_3q_exp.outputList.zipWithIndex.map { case(exp, index) => |
| (exp, gb_2c.outputList.find { |
| case a: Alias if exp.isInstanceOf[Alias] => |
| a.child.semanticEquals(exp.children.head) |
| case a: Alias => a.child.semanticEquals(exp) |
| case other => other.semanticEquals(exp) |
| }.getOrElse(gb_2c.outputList(index))) |
| } |
| |
| val oList = MVHelper.createAliases(mappings) |
| |
| val wip = sel_3q_exp.copy(outputList = oList, children = Seq(gb_3c2)) |
| val sel_3c3 = Some(factorOutSubsumer(wip, sel_3a, s.aliasMap)) |
| sel_3c3.map(Seq(_)).getOrElse(Nil) |
| |
| case _ => Nil |
| } |
| } else { |
| Nil |
| } |
| |
| case _ => Nil |
| } |
| } |
| |
| |
| case class AliasWrapper(alias: Alias) extends UnaryExpression { |
| override def child: Expression = null |
| |
| override protected def doGenCode(ctx: CodegenContext, |
| ev: ExprCode): ExprCode = ev |
| |
| override def dataType: DataType = alias.dataType |
| } |
| |
| } |
| |
| |