blob: 7e8eb9686eb1ad17d6cfbaec5313e5e1f5bcd71c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.mv.rewrite
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Expression, PredicateHelper, _}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter}
import org.apache.spark.sql.types.DataType
import org.apache.carbondata.mv.datamap.MVHelper
import org.apache.carbondata.mv.plans.modular.{JoinEdge, Matchable, ModularPlan, _}
import org.apache.carbondata.mv.plans.modular
import org.apache.carbondata.mv.plans.modular.Flags._
import org.apache.carbondata.mv.plans.util.SQLBuilder
abstract class DefaultMatchMaker extends MatchMaker[ModularPlan]
abstract class DefaultMatchPattern extends MatchPattern[ModularPlan] {
/** Name for this pattern, automatically inferred based on class name. */
val patternName: String = {
val className = getClass.getName
if (className endsWith "$") className.dropRight(1) else className
}
def factorOutSubsumer(
compensation: ModularPlan,
subsumer: Matchable,
aliasMapMain: Map[Int, String]): ModularPlan = {
// Create aliasMap with attribute to alias reference attribute
val aliasMap = AttributeMap(
subsumer.outputList.collect {
case a: Alias if a.child.isInstanceOf[Attribute] =>
(a.child.asInstanceOf[Attribute], a.toAttribute)
})
// Create aliasMap with Expression to alias reference attribute
val aliasMapExp =
subsumer.outputList.collect {
case a: Alias if a.child.isInstanceOf[Expression] &&
!a.child.isInstanceOf[AggregateExpression] =>
a.child -> a.toAttribute
}.toMap
// Check and replace all alias references with subsumer alias map references.
val compensation1 = compensation.transform {
case plan if !plan.skip && plan != subsumer =>
plan.transformExpressions {
case a: AttributeReference =>
aliasMap
.get(a)
.map { ref =>
AttributeReference(
ref.name, ref.dataType)(
exprId = ref.exprId,
qualifier = a.qualifier)
}.getOrElse(a)
case a: Expression =>
aliasMapExp
.get(a)
.map { ref =>
AttributeReference(
ref.name, ref.dataType)(
exprId = ref.exprId)
}.getOrElse(a)
}
}
val subqueryAttributeSet = SQLBuilder.collectAttributeSet(subsumer.outputList)
if (SQLBuilder.collectDuplicateNames(subqueryAttributeSet).nonEmpty) {
new UnsupportedOperationException(
s"duplicate name(s): ${ subsumer.output.map(_.toString + ", ") }")
}
if (aliasMapMain.size == 1) {
val subsumerName: Option[String] = aliasMapMain.get(0)
// Replace all compensation1 attributes with refrences of subsumer attributeset
val compensationFinal = compensation1.transformExpressions {
case ref: Attribute if subqueryAttributeSet.contains(ref) =>
AttributeReference(ref.name, ref.dataType)(exprId = ref.exprId, qualifier = subsumerName)
case alias: Alias if subqueryAttributeSet.contains(alias.toAttribute) =>
Alias(alias.child, alias.name)(exprId = alias.exprId, qualifier = subsumerName)
}
compensationFinal
} else {
compensation1
}
}
}
object DefaultMatchMaker extends DefaultMatchMaker {
lazy val patterns =
SelectSelectNoChildDelta ::
GroupbyGroupbyNoChildDelta ::
GroupbyGroupbySelectOnlyChildDelta ::
GroupbyGroupbyGroupbyChildDelta ::
SelectSelectSelectChildDelta ::
SelectSelectGroupbyChildDelta :: Nil
}
/**
* Convention:
* EmR: each subsumee's expression match some of subsumer's expression
* EdR: each subsumee's expression derive from some of subsumer's expression
* RmE: each subsumer's expression match some of subsumee's expression
* RdE: each subsumer's expression derive from some of subsumee's expression
*/
object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper {
private def isDerivable(
exprE: Expression,
exprListR: Seq[Expression],
subsumee: ModularPlan,
subsumer: ModularPlan,
compensation: Option[ModularPlan]): Boolean = {
if (subsumee.asInstanceOf[Select].predicateList.contains(exprE)) {
subsumer.asInstanceOf[Select].predicateList.exists(_.semanticEquals(exprE)) ||
canEvaluate(exprE, subsumer)
} else if (subsumee.asInstanceOf[Select].outputList.contains(exprE)) {
exprE match {
case a@Alias(_, _) =>
exprListR.exists(a1 => a1.isInstanceOf[Alias] &&
a1.asInstanceOf[Alias].child.semanticEquals(a.child)) ||
exprListR.exists(_.semanticEquals(exprE) || canEvaluate(exprE, subsumer))
case exp =>
exprListR.exists(a1 => a1.isInstanceOf[Alias] &&
a1.asInstanceOf[Alias].child.semanticEquals(exp)) ||
exprListR.exists(_.semanticEquals(exprE) || canEvaluate(exprE, subsumer))
}
} else {
false
}
}
private def isLeftJoinView(subsumer: ModularPlan): Boolean = {
subsumer match {
case sel: Select
if sel.joinEdges.length == 1 &&
sel.joinEdges.head.joinType == LeftOuter &&
sel.children(1).isInstanceOf[HarmonizedRelation] =>
val hDim = sel.children(1).asInstanceOf[HarmonizedRelation]
hDim.tag match {
case Some(tag) => sel.outputList.contains(tag)
case None => false
}
case _ => false
}
}
def apply(subsumer: ModularPlan,
subsumee: ModularPlan,
compensation: Option[ModularPlan],
rewrite: QueryRewrite): Seq[ModularPlan] = {
(subsumer, subsumee, compensation) match {
case (
sel_1a @ modular.Select(_, _, _, _, _, _, _, _, _, _),
sel_1q @ modular.Select(_, _, _, _, _, _, _, _, _, _), None
) if sel_1a.children.forall { _.isInstanceOf[modular.LeafNode] } &&
sel_1q.children.forall { _.isInstanceOf[modular.LeafNode] } =>
// assume children (including harmonized relation) of subsumer and subsumee
// are 1-1 correspondence.
// Change the following two conditions to more complicated ones if we want to
// consider things that combine extrajoin, rejoin, and harmonized relations
val isUniqueRmE = subsumer.children.filter { x => subsumee.children.count{
case relation: ModularRelation => relation.fineEquals(x)
case other => other == x
} != 1 }
val isUniqueEmR = subsumee.children.filter { x => subsumer.children.count{
case relation: ModularRelation => relation.fineEquals(x)
case other => other == x
} != 1 }
val extrajoin = sel_1a.children.filterNot { child => sel_1q.children.contains(child) }
val rejoin = sel_1q.children.filterNot { child => sel_1a.children.contains(child) }
val rejoinOutputList = rejoin.flatMap(_.output)
val isPredicateRmE = sel_1a.predicateList.forall(expr =>
sel_1q.predicateList.exists(_.semanticEquals(expr)))
val isPredicateEmdR = sel_1q.predicateList.forall(expr =>
isDerivable(expr, sel_1a.outputList ++ rejoinOutputList, sel_1q, sel_1a, None))
// Check if sel_1q.outputList is non empty and then check whether
// it can be derivable with sel_1a otherwise for empty cases it returns true.
val isOutputEdR = sel_1q.outputList.nonEmpty && sel_1q.outputList.forall(expr =>
isDerivable(expr, sel_1a.outputList ++ rejoinOutputList, sel_1q, sel_1a, None))
if (isUniqueRmE.isEmpty && isUniqueEmR.isEmpty && extrajoin.isEmpty && isPredicateRmE &&
isPredicateEmdR && isOutputEdR) {
val mappings = sel_1a.children.zipWithIndex.map {
case (childr, fromIdx) if sel_1q.children.contains(childr) =>
val toIndx = sel_1q.children.indexWhere{
case relation: ModularRelation => relation.fineEquals(childr)
case other => other == childr
}
(toIndx -> fromIdx)
}
val e2r = mappings.toMap
val r2e = e2r.map(_.swap)
val r2eJoinsMatch = sel_1a.joinEdges.forall { x =>
(r2e.get(x.left), r2e.get(x.right)) match {
case (Some(l), Some(r)) =>
val mappedEdge = JoinEdge(l, r, x.joinType)
val joinTypeEquivalent =
if (sel_1q.joinEdges.contains(mappedEdge)) {
true
} else {
x.joinType match {
case Inner | FullOuter =>
sel_1q.joinEdges.contains(JoinEdge(r, l, x.joinType))
case LeftOuter if isLeftJoinView(sel_1a) =>
sel_1q.joinEdges.contains(JoinEdge(l, r, Inner)) ||
sel_1q.joinEdges.contains(JoinEdge(r, l, Inner))
case _ => false
}
}
if (joinTypeEquivalent) {
val sel_1a_join = sel_1a.extractJoinConditions(
sel_1a.children(x.left),
sel_1a.children(x.right))
val sel_1q_join = sel_1q.extractJoinConditions(
sel_1q.children(mappedEdge.left),
sel_1q.children(mappedEdge.right))
sel_1a_join.forall(e => sel_1q_join.exists(e.semanticEquals(_))) &&
sel_1q_join.forall(e => sel_1a_join.exists(e.semanticEquals(_)))
} else false
case _ => false
}
}
val isPredicateEmR = sel_1q.predicateList.forall(expr =>
sel_1a.predicateList.exists(_.semanticEquals(expr)))
val isOutputEmR = sel_1q.outputList.forall(expr =>
sel_1a.outputList.exists(_.semanticEquals(expr)))
val isOutputRmE = sel_1a.outputList.forall(expr =>
sel_1q.outputList.exists(_.semanticEquals(expr)))
val isLOEmLOR = !(isLeftJoinView(sel_1a) && sel_1q.joinEdges.head.joinType == Inner)
if (r2eJoinsMatch) {
if (isPredicateEmR && isOutputEmR && isOutputRmE && rejoin.isEmpty && isLOEmLOR) {
Seq(sel_1a)
} else {
// no compensation needed
val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
val tAliasMap = new collection.mutable.HashMap[Int, String]()
val usel_1a = sel_1a.copy(outputList = sel_1a.outputList)
tChildren += usel_1a
tAliasMap += (tChildren.indexOf(usel_1a) -> rewrite.newSubsumerName())
sel_1q.children.zipWithIndex.foreach {
case (childe, idx) =>
if (e2r.get(idx).isEmpty) {
tChildren += childe
sel_1q.aliasMap.get(idx).map(x => tAliasMap += (tChildren.indexOf(childe) -> x))
}
}
val tJoinEdges = sel_1q.joinEdges.collect {
case JoinEdge(le, re, joinType) =>
(e2r.get(le), e2r.get(re)) match {
case (Some(lr), None) =>
JoinEdge(
0,
tChildren.indexOf(sel_1q.children(re)),
joinType)
case (None, None) =>
JoinEdge(
tChildren.indexOf(sel_1q.children(le)),
tChildren.indexOf(sel_1q.children(re)),
joinType)
case (None, Some(rr)) =>
JoinEdge(
tChildren.indexOf(sel_1q.children(le)),
0,
joinType)
case _ =>
null.asInstanceOf[JoinEdge]
}
}
val tPredicateList = sel_1q.predicateList.filter { p =>
!sel_1a.predicateList.exists(_.semanticEquals(p))
} ++ (if (isLeftJoinView(sel_1a) &&
sel_1q.joinEdges.head.joinType == Inner) {
sel_1a.children(1)
.asInstanceOf[HarmonizedRelation].tag.map(IsNotNull(_)).toSeq
} else {
Seq.empty
})
val sel_1q_temp = sel_1q.copy(
predicateList = tPredicateList,
children = tChildren,
joinEdges = tJoinEdges.filter(_ != null),
aliasMap = tAliasMap.toMap)
val done = factorOutSubsumer(sel_1q_temp, usel_1a, sel_1q_temp.aliasMap)
Seq(done)
}
} else Nil
} else Nil
case (
sel_3a @ modular.Select(_, _, _, _, _, _, _, _, _, _),
sel_3q @ modular.Select(_, _, _, _, _, _, _, _, _, _), None)
if sel_3a.children.forall(_.isInstanceOf[GroupBy]) &&
sel_3q.children.forall(_.isInstanceOf[GroupBy]) =>
val isPredicateRmE = sel_3a.predicateList.isEmpty ||
sel_3a.predicateList.forall(expr =>
sel_3q.predicateList.exists(_.semanticEquals(expr)))
val isPredicateEmdR = sel_3q.predicateList.isEmpty ||
sel_3q.predicateList.forall(expr =>
sel_3a.predicateList.exists(_.semanticEquals(expr)) ||
isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))
val isOutputEdR = sel_3q.outputList.forall(expr =>
isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))
val isSingleChild = sel_3a.children.length == 1 && sel_3q.children.length == 1
if (isPredicateRmE && isPredicateEmdR && isOutputEdR && isSingleChild) {
val isPredicateEmR = sel_3q.predicateList.isEmpty ||
sel_3q.predicateList.forall(expr =>
sel_3a.predicateList.exists(_.semanticEquals(expr)))
val isOutputRmE = sel_3a.outputList.forall(expr =>
isDerivable(expr, sel_3q.outputList, sel_3a, sel_3q, None))
val isOutputEmR = sel_3q.outputList.forall(expr =>
isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))
if (isPredicateEmR && isOutputEmR && isOutputRmE) {
Seq(sel_3a)
} else if (isPredicateEmR && isOutputEmR) {
// no compensation needed
val sel_3q_exp = sel_3q.transformExpressions({
case a: Alias => sel_3a.outputList
.find { a1 =>
a1.isInstanceOf[Alias] &&
a1.asInstanceOf[Alias].child.semanticEquals(a.child)
}.map(_.toAttribute).get
})
val wip = sel_3q_exp.copy(
children = Seq(sel_3a),
aliasMap = Seq(0 -> rewrite.newSubsumerName()).toMap)
val done = factorOutSubsumer(wip, sel_3a, wip.aliasMap)
Seq(done)
} else {
Nil
}
} else Nil
case _ => Nil
}
}
}
object GroupbyGroupbyNoChildDelta extends DefaultMatchPattern {
def apply(
subsumer: ModularPlan,
subsumee: ModularPlan,
compensation: Option[ModularPlan],
rewrite: QueryRewrite): Seq[ModularPlan] = {
(subsumer, subsumee, compensation) match {
case (
gb_2a @ modular.GroupBy(_, _, _, _, _, _, _, _),
gb_2q @ modular.GroupBy(_, _, _, _, _, _, _, _),
None) =>
val isGroupingEmR = gb_2q.predicateList.forall(expr =>
gb_2a.predicateList.exists(_.semanticEquals(expr)))
val isGroupingRmE = gb_2a.predicateList.forall(expr =>
gb_2q.predicateList.exists(_.semanticEquals(expr)))
val isOutputEmR = gb_2q.outputList.forall {
case a @ Alias(_, _) =>
gb_2a.outputList.exists{
case a1: Alias => a1.child.semanticEquals(a.child)
case exp => exp.semanticEquals(a.child)
}
case exp => gb_2a.outputList.exists(_.semanticEquals(exp))
}
if (isGroupingEmR && isGroupingRmE) {
if (isOutputEmR) {
// Mappings of output of two plans by checking semantic equals.
val mappings = gb_2a.outputList.zipWithIndex.map { case(exp, index) =>
(exp, gb_2q.outputList.find {
case a: Alias if exp.isInstanceOf[Alias] =>
a.child.semanticEquals(exp.children.head)
case a: Alias => a.child.semanticEquals(exp)
case other => other.semanticEquals(exp)
}.getOrElse(gb_2a.outputList(index)))
}
val oList = mappings.map{case (out1, out2) =>
if (out1.name != out2.name) out1 match {
case alias: Alias => Alias(alias.child, out2.name)(exprId = alias.exprId)
case _ => Alias(out1, out2.name)(exprId = out2.exprId)
} else out1
}
Seq(gb_2a.copy(outputList = oList))
} else {
Nil
}
} else {
val aliasMap = AttributeMap(gb_2a.outputList.collect { case a: Alias =>
(a.toAttribute, a)})
if (isGroupingEmR) {
Utils.tryMatch(
gb_2a, gb_2q, aliasMap).flatMap {
case g: GroupBy =>
// Check any agg function exists on outputlist, in case of expressions like
// sum(a), then create new alias and copy to group by node
val aggFunExists = g.outputList.exists { f =>
f.find {
case _: AggregateExpression => true
case _ => false
}.isDefined
}
if (aggFunExists && !isGroupingRmE && isOutputEmR) {
val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
val sel_1a = g.child.asInstanceOf[Select]
val usel_1a = sel_1a.copy(outputList = sel_1a.outputList)
tChildren += gb_2a
val sel_1q_temp = sel_1a.copy(
predicateList = sel_1a.predicateList,
children = tChildren,
joinEdges = sel_1a.joinEdges,
aliasMap = Seq(0 -> rewrite.newSubsumerName()).toMap)
val res = factorOutSubsumer(sel_1q_temp, usel_1a, sel_1q_temp.aliasMap)
Some(g.copy(child = res))
} else {
Some(g.copy(child = g.child.withNewChildren(
g.child.children.map {
case modular.Select(_, _, _, _, _, _, _, _, _, _) => gb_2a;
case other => other
})));
}
case _ => None}.map(Seq(_)).getOrElse(Nil)
} else {
Nil
}
}
case _ => Nil
}
}
}
object GroupbyGroupbySelectOnlyChildDelta extends DefaultMatchPattern with PredicateHelper {
private def isDerivable(
exprE: Expression,
exprListR: Seq[Expression],
subsumee: ModularPlan,
subsumer: ModularPlan,
compensation: Option[ModularPlan]) = {
if (subsumee.asInstanceOf[GroupBy].predicateList.contains(exprE)) {
if (exprListR.exists(_.semanticEquals(exprE)) || canEvaluate(exprE, exprListR) ||
isDerivableForUDF(exprE, exprListR)) {
true
} else {
false
}
} else if (compensation.getOrElse(throw new RuntimeException("compensation cannot be None"))
.asInstanceOf[Select].predicateList.contains(exprE)) {
if (canEvaluate(exprE, exprListR) || exprListR.exists(_.semanticEquals(exprE)) ||
isDerivableForUDF(exprE, exprListR)) {
true
} else {
false
}
} else {
false
}
}
/**
* org.apache.carbondata.mv.plans.MorePredicateHelper#canEvaluate will be checking the
* exprE.references as subset of AttibuteSet(exprListR), which will just take the column name from
* UDF, so it will be always false for ScalaUDF.
* This method takes care of checking whether the exprE references can be derived form list
*
* Example:
* exprE has attribute reference for timeseries column like UDF:timeseries(projectjoindate, month)
* So exprE.references will give UDF:timeseries(projectjoindate, month)
* exprListR has ScalaUDF also, so AttributeSet(exprListR) contains just column name like
* projectjoindate, so canEvaluate method returns false.
*
* Here checking whether the exprListR(ScalaUDF) .sql gives UDF:timeseries(projectjoindate, month)
* which can be checked with exprE.references
*/
private def isDerivableForUDF(exprE: Expression, exprListR: Seq[Expression]): Boolean = {
var canBeDerived = false
exprListR.forall {
case a: ScalaUDF =>
a.references.foreach { a =>
canBeDerived = exprE.sql.contains(a.name)
}
canBeDerived
case _ =>
canBeDerived
}
}
def apply(
subsumer: ModularPlan,
subsumee: ModularPlan,
compensation: Option[ModularPlan],
rewrite: QueryRewrite): Seq[ModularPlan] = {
val aggInputEinR = subsumee.expressions
.collect { case agg: aggregate.AggregateExpression => AttributeSet(Seq(agg))
.subsetOf(subsumer.outputSet)
}.forall(identity)
val compensationSelectOnly = !compensation.map { _.collect { case n => n.getClass } }
.exists(_.contains(modular.GroupBy))
(subsumer, subsumee, compensation, aggInputEinR, compensationSelectOnly) match {
case (
gb_2a @ modular.GroupBy(_, _, _, _, _, _, _, _),
gb_2q @ modular.GroupBy(_, _, _, _, _, _, _, _),
Some(sel_1c1 @ modular.Select(_, _, _, _, _, _, _, _, _, _)),
true,
true)
if !gb_2q.flags.hasFlag(EXPAND) && !gb_2a.flags.hasFlag(EXPAND) =>
val rejoinOutputList = sel_1c1.children.tail.flatMap(_.output)
val isGroupingEdR = gb_2q.predicateList.forall(expr =>
isDerivable(expr, gb_2a.predicateList ++ rejoinOutputList, gb_2q, gb_2a, compensation))
val needRegrouping = !gb_2a.predicateList.forall(gb_2q.predicateList.contains)
val canPullup = sel_1c1.predicateList.forall(expr =>
isDerivable(expr, gb_2a.predicateList ++ rejoinOutputList, gb_2q, gb_2a, compensation))
val isAggEmR = gb_2q.outputList.collect {
case agg: aggregate.AggregateExpression =>
gb_2a.outputList.exists(_.semanticEquals(agg))
}.forall(identity)
if (isGroupingEdR && ((!needRegrouping && isAggEmR) || needRegrouping) && canPullup) {
// pull up
val pullupOutputList = gb_2a.outputList.map(_.toAttribute) ++ rejoinOutputList
val myOutputList = gb_2a.outputList.filter {
case alias: Alias => gb_2q.outputList.filter(_.isInstanceOf[Alias])
.exists(_.asInstanceOf[Alias].child.semanticEquals(alias.child))
case attr: Attribute => gb_2q.outputList.exists(_.semanticEquals(attr))
}.map(_.toAttribute) ++ rejoinOutputList
// TODO: find out if we really need to check needRegrouping or just use myOutputList
val sel_2c1 = if (needRegrouping) {
sel_1c1
.copy(outputList = pullupOutputList,
inputList = pullupOutputList,
children = sel_1c1.children
.map { _ match { case s: modular.Select => gb_2a; case other => other } })
} else {
sel_1c1
.copy(outputList = myOutputList,
inputList = pullupOutputList,
children = sel_1c1.children
.map { _ match { case s: modular.Select => gb_2a; case other => other } })
}
// sel_1c1.copy(outputList = pullupOutputList, inputList = pullupOutputList, children =
// sel_1c1.children.map { _ match { case s: modular.Select => gb_2a; case other =>
// other } })
if (rejoinOutputList.isEmpty) {
val aliasMap = AttributeMap(gb_2a.outputList.collect {
case a: Alias => (a.toAttribute, a)
})
val res =
Utils.tryMatch(gb_2a, gb_2q, aliasMap).flatMap {
case g: GroupBy =>
// Check any agg function exists on outputlist, in case of expressions like
// sum(a)+sum(b) , outputlist directly replaces with alias with in place of function
// so we should remove the groupby clause in those cases.
val aggFunExists = g.outputList.exists { f =>
f.find {
case ag: AggregateExpression => true
case _ => false
}.isDefined
}
if (aggFunExists) {
Some(g.copy(child = sel_2c1))
} else {
// Remove group by clause.
Some(g.copy(child = sel_2c1, predicateList = Seq.empty))
}
case _ => None
}.map { wip =>
factorOutSubsumer(wip, gb_2a, sel_1c1.aliasMap)
}.map(Seq(_))
.getOrElse(Nil)
res
}
// TODO: implement regrouping with 1:N rejoin (rejoin tables being the "1" side)
// via catalog service
else if (!needRegrouping && isAggEmR) {
Seq(sel_2c1).map(wip => factorOutSubsumer(wip, gb_2a, sel_1c1.aliasMap))
} else Nil
} else Nil
case _ => Nil
}
}
}
object GroupbyGroupbyGroupbyChildDelta extends DefaultMatchPattern {
def apply(
subsumer: ModularPlan,
subsumee: ModularPlan,
compensation: Option[ModularPlan],
rewrite: QueryRewrite): Seq[ModularPlan] = {
val groupbys = compensation.map { _.collect { case g: GroupBy => g } }.getOrElse(Nil).toSet
(subsumer, subsumee, groupbys.nonEmpty) match {
case (
modular.Select(_, _, _, _, _, _, _, _, _, _),
modular.Select(_, _, _, _, _, _, _, _, _, _),
true) =>
// TODO: implement me
Nil
case _ => Nil
}
}
}
object SelectSelectSelectChildDelta extends DefaultMatchPattern {
def apply(
subsumer: ModularPlan,
subsumee: ModularPlan,
compensation: Option[ModularPlan],
rewrite: QueryRewrite): Seq[ModularPlan] = {
val compensationSelectOnly =
!compensation
.map { _.collect { case n => n.getClass } }
.exists(_.contains(modular.GroupBy))
(subsumer, subsumee, compensationSelectOnly) match {
case (
modular.Select(_, _, _, _, _, _, _, _, _, _),
modular.Select(_, _, _, _, _, _, _, _, _, _),
true) =>
// TODO: implement me
Nil
case _ => Nil
}
}
}
object SelectSelectGroupbyChildDelta extends DefaultMatchPattern with PredicateHelper {
private def isDerivable(
exprE: Expression,
exprListR: Seq[Expression],
subsumee: ModularPlan,
subsumer: ModularPlan,
compensation: Option[ModularPlan]) = {
Utils.isDerivable(
exprE: Expression,
exprListR: Seq[Expression],
subsumee: ModularPlan,
subsumer: ModularPlan,
compensation: Option[ModularPlan])
}
def apply(
subsumer: ModularPlan,
subsumee: ModularPlan,
compensation: Option[ModularPlan],
rewrite: QueryRewrite): Seq[ModularPlan] = {
(subsumer, subsumee, compensation, subsumer.children, subsumee.children) match {
case (
sel_3a@modular.Select(
_, _, Nil, _, _,
Seq(gb_2a@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _),
sel_3q@modular.Select(
_, _, _, _, _,
Seq(gb_2q@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _),
Some(gb_2c@modular.GroupBy(_, _, _, _, _, _, _, _)),
rchild :: Nil,
echild :: Nil) =>
val tbls_sel_3a = sel_3a.collect { case tbl: modular.LeafNode => tbl }
val tbls_sel_3q = sel_3q.collect { case tbl: modular.LeafNode => tbl }
val extrajoin = tbls_sel_3a.filterNot(tbls_sel_3q.contains)
val rejoin = tbls_sel_3q.filterNot(tbls_sel_3a.contains)
val rejoinOutputList = rejoin.flatMap(_.output)
val isPredicateRmE = sel_3a.predicateList.forall(expr =>
sel_3q.predicateList.exists(_.semanticEquals(expr)) ||
gb_2c.predicateList.exists(_.semanticEquals(expr)))
val isPredicateEmdR = sel_3q.predicateList
.forall(expr =>
sel_3a.predicateList.exists(_.semanticEquals(expr)) ||
isDerivable(
expr,
sel_3a.outputList ++ rejoinOutputList,
sel_3q,
sel_3a,
compensation))
val isOutputEdR = sel_3q.outputList
.forall(expr =>
isDerivable(
expr,
sel_3a.outputList ++ rejoinOutputList,
sel_3q,
sel_3a,
compensation))
val canSELPullup = gb_2c.child.isInstanceOf[Select] &&
gb_2c.child.asInstanceOf[Select].predicateList
.forall(expr =>
isDerivable(
expr,
sel_3a.outputList ++ rejoinOutputList,
sel_3q,
sel_3a,
compensation))
val canGBPullup = gb_2c.predicateList
.forall(expr =>
isDerivable(
expr,
sel_3a.outputList ++ rejoinOutputList,
sel_3q,
sel_3a,
compensation))
if (extrajoin.isEmpty && isPredicateRmE &&
isPredicateEmdR &&
isOutputEdR &&
canSELPullup &&
canGBPullup) {
gb_2c.child match {
case s: Select =>
val sel_3c1 = s.withNewChildren(
s.children.map {
case gb: GroupBy => sel_3a.setSkip()
case other => other })
val gb_3c2 = gb_2c.copy(child = sel_3c1)
val aliasMap_exp = AttributeMap(
gb_2c.outputList.collect {
case a: Alias => (a.toAttribute, AliasWrapper(a)) })
val sel_3q_exp = sel_3q.transformExpressions({
case attr: Attribute if aliasMap_exp.contains(attr) => aliasMap_exp(attr)
}).transformExpressions {
case AliasWrapper(alias: Alias) => alias
}
// Mappings of output of two plans by checking semantic equals.
val mappings = sel_3q_exp.outputList.zipWithIndex.map { case(exp, index) =>
(exp, gb_2c.outputList.find {
case a: Alias if exp.isInstanceOf[Alias] =>
a.child.semanticEquals(exp.children.head)
case a: Alias => a.child.semanticEquals(exp)
case other => other.semanticEquals(exp)
}.getOrElse(gb_2c.outputList(index)))
}
val oList = MVHelper.createAliases(mappings)
val wip = sel_3q_exp.copy(outputList = oList, children = Seq(gb_3c2))
val sel_3c3 = Some(factorOutSubsumer(wip, sel_3a, s.aliasMap))
sel_3c3.map(Seq(_)).getOrElse(Nil)
case _ => Nil
}
} else {
Nil
}
case _ => Nil
}
}
case class AliasWrapper(alias: Alias) extends UnaryExpression {
override def child: Expression = null
override protected def doGenCode(ctx: CodegenContext,
ev: ExprCode): ExprCode = ev
override def dataType: DataType = alias.dataType
}
}