blob: 54a684e2f448dea05cc4ac72e84507b19657efe2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.mv.rewrite
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.carbondata.mv.datamap.MVUtil
import org.apache.carbondata.mv.expressions.modular._
import org.apache.carbondata.mv.plans.modular
import org.apache.carbondata.mv.plans.modular._
import org.apache.carbondata.mv.session.MVSession
private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MVSession) {
def rewriteWithSummaryDatasets(plan: ModularPlan, rewrite: QueryRewrite): ModularPlan = {
val replaced = plan.transformAllExpressions {
case s: ModularSubquery =>
if (s.children.isEmpty) {
rewriteWithSummaryDatasetsCore(s.plan, rewrite) match {
case Some(rewrittenPlan) => ScalarModularSubquery(rewrittenPlan, s.children, s.exprId)
case None => s
}
}
else throw new UnsupportedOperationException(s"Rewrite expression $s isn't supported")
case o => o
}
rewriteWithSummaryDatasetsCore(replaced, rewrite).getOrElse(replaced)
}
def rewriteWithSummaryDatasetsCore(plan: ModularPlan,
rewrite: QueryRewrite): Option[ModularPlan] = {
val rewrittenPlan = plan transformDown {
case currentFragment =>
if (currentFragment.rewritten || !currentFragment.isSPJGH) currentFragment
else {
val compensation =
(for { dataset <- catalog.lookupFeasibleSummaryDatasets(currentFragment).toStream
subsumer <- session.sessionState.modularizer.modularize(
session.sessionState.optimizer.execute(dataset.plan)).map(_.semiHarmonized)
subsumee <- unifySubsumee(currentFragment)
comp <- subsume(
unifySubsumer2(
unifySubsumer1(
subsumer,
subsumee,
dataset.relation),
subsumee),
subsumee, rewrite)
} yield comp).headOption
compensation.map(_.setRewritten).getOrElse(currentFragment)
}
}
if (rewrittenPlan.fastEquals(plan)) {
None
} else {
Some(rewrittenPlan)
}
}
def subsume(
subsumer: ModularPlan,
subsumee: ModularPlan,
rewrite: QueryRewrite): Option[ModularPlan] = {
if (subsumer.getClass == subsumee.getClass) {
(subsumer.children, subsumee.children) match {
case (Nil, Nil) => None
case (r, e) if r.forall(_.isInstanceOf[modular.LeafNode]) &&
e.forall(_.isInstanceOf[modular.LeafNode]) =>
val iter = session.sessionState.matcher.execute(subsumer, subsumee, None, rewrite)
if (iter.hasNext) Some(iter.next)
else None
case (rchild :: Nil, echild :: Nil) =>
val compensation = subsume(rchild, echild, rewrite)
val oiter = compensation.map {
case comp if comp.eq(rchild) =>
session.sessionState.matcher.execute(subsumer, subsumee, None, rewrite)
case _ =>
session.sessionState.matcher.execute(subsumer, subsumee, compensation, rewrite)
}
oiter.flatMap { case iter if iter.hasNext => Some(iter.next)
case _ => None }
case _ => None
}
} else {
None
}
}
// add Select operator as placeholder on top of subsumee to facilitate matching
def unifySubsumee(subsumee: ModularPlan): Option[ModularPlan] = {
subsumee match {
case gb @ modular.GroupBy(_, _, _, _,
modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _) =>
Some(
Select(gb.outputList, gb.outputList, Nil, Map.empty, Nil, gb :: Nil, gb.flags,
gb.flagSpec, Seq.empty))
case other => Some(other)
}
}
// add Select operator as placeholder on top of subsumer to facilitate matching
def unifySubsumer1(
subsumer: ModularPlan,
subsumee: ModularPlan,
dataMapRelation: ModularPlan): ModularPlan = {
// Update datamap table relation to the subsumer modular plan
val mVUtil = new MVUtil
val updatedSubsumer = subsumer match {
// In case of order by it adds extra select but that can be ignored while doing selection.
case s@Select(_, _, _, _, _, Seq(g: GroupBy), _, _, _, _) =>
s.copy(children = Seq(g.copy(dataMapTableRelation = Some(dataMapRelation))),
outputList = mVUtil.updateDuplicateColumns(s.outputList))
case s: Select => s
.copy(dataMapTableRelation = Some(dataMapRelation),
outputList = mVUtil.updateDuplicateColumns(s.outputList))
case g: GroupBy => g
.copy(dataMapTableRelation = Some(dataMapRelation),
outputList = mVUtil.updateDuplicateColumns(g.outputList))
case other => other
}
(updatedSubsumer, subsumee) match {
case (r @
modular.GroupBy(_, _, _, _, modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _),
modular.Select(_, _, _, _, _,
Seq(modular.GroupBy(_, _, _, _, modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)),
_, _, _, _)
) =>
modular.Select(
r.outputList, r.outputList, Nil, Map.empty, Nil, r :: Nil, r.flags,
r.flagSpec, Seq.empty).setSkip()
case _ => updatedSubsumer.setSkip()
}
}
def unifySubsumer2(subsumer: ModularPlan, subsumee: ModularPlan): ModularPlan = {
val rtables = subsumer.collect { case n: modular.LeafNode => n }
val etables = subsumee.collect { case n: modular.LeafNode => n }
val pairs = for {
i <- rtables.indices
j <- etables.indices
if rtables(i) == etables(j) && reTablesJoinMatched(
rtables(i), etables(j), subsumer, subsumee, i, j
)
} yield (rtables(i), etables(j))
pairs.foldLeft(subsumer) {
case (curSubsumer, pair) =>
val mappedOperator =
pair._1 match {
case relation: HarmonizedRelation if relation.hasTag =>
pair._2.asInstanceOf[HarmonizedRelation].addTag
case _ =>
pair._2
}
val nxtSubsumer = curSubsumer.transform {
case node: ModularRelation if node.fineEquals(pair._1) => mappedOperator
case pair._1 if !pair._1.isInstanceOf[ModularRelation] => mappedOperator
}
// val attributeSet = AttributeSet(pair._1.output)
// reverse first due to possible tag for left join
val rewrites = AttributeMap(pair._1.output.zip(mappedOperator.output))
nxtSubsumer.transformUp {
case p => p.transformExpressions {
case a: Attribute if rewrites contains a => rewrites(a).withQualifier(a.qualifier)
}
}
}
}
// match the join table of subsumer and subsumee
// when the table names are the same
def reTablesJoinMatched(rtable: modular.LeafNode, etable: modular.LeafNode,
subsumer: ModularPlan, subsumee: ModularPlan,
rIndex: Int, eIndex: Int): Boolean = {
(rtable, etable) match {
case _: (ModularRelation, ModularRelation) =>
val rtableParent = subsumer.find(p => p.children.contains(rtable)).get
val etableParent = subsumee.find(p => p.children.contains(etable)).get
(rtableParent, etableParent) match {
case (e: Select, r: Select) =>
val intersetJoinEdges = r.joinEdges intersect e.joinEdges
if (intersetJoinEdges.nonEmpty) {
return intersetJoinEdges.exists(j => j.left == rIndex && j.left == eIndex ||
j.right == rIndex && j.right == eIndex)
}
case _ => false
}
}
true
}
}