blob: b17eea2253a07620e9c51528fc805e328a6af944 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.mv.plans.util
import java.util.concurrent.atomic.AtomicLong
import scala.collection.immutable
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Cast, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.carbondata.mv.expressions.modular._
import org.apache.carbondata.mv.plans._
import org.apache.carbondata.mv.plans.modular.{Select, _}
import org.apache.carbondata.mv.plans.util.SQLBuildDSL.Fragment
// TODO: generalize this using SQLBuild to handle nested sub-query expression, using the
// correspondence
// SQLBuild <--> QueryRewrite
class SQLBuilder private(
modularPlan: ModularPlan,
nextSubqueryId: AtomicLong,
subqueryPrefix: String) {
def this(modularPlan: ModularPlan) = {
this(
SQLBuilder.subqueryPreprocess(modularPlan, new AtomicLong()),
new AtomicLong(0),
"gen_subquery_")
}
// only allow one level scalar subquery
def this(modularPlan: ModularPlan, subqueryPrefix: String) = {
this(SQLBuilder.screenOutPlanWithSubquery(modularPlan), new AtomicLong(0), subqueryPrefix)
}
private def newSubqueryName(): String = {
s"${ subqueryPrefix }${
nextSubqueryId.getAndIncrement()
}"
}
def fragmentExtract: Fragment = {
val finalPlan: ModularPlan = SQLizer.execute(modularPlan)
SQLBuildDSL.fragmentExtract(finalPlan, None)
}
object SQLizer extends RuleExecutor[ModularPlan] {
override protected def batches: Seq[Batch] = {
Seq(
Batch(
"Recover Scoping Info", Once,
// Clean up qualifiers due to erasure of sub-query aliases to use base tables as
// qualifiers
// This complements AddSubquery to handle corner case qMV3 in Tpcds_1_4_Querybatch.scala
CleanupQualifier,
// Insert sub queries on top of operators that need to appear after FROM clause.
AddSubquery
// Removes [[Cast Casts]] that are unnecessary when converting back to SQL
// Comment out for now, will add later by converting AttributMap to Map in SQLBuildDSL
// .scala
// RemoveCasts
)
)
}
object CleanupQualifier extends Rule[ModularPlan] {
override def apply(tree: ModularPlan): ModularPlan = {
tree transformDown {
case s@modular.Select(_, _, _, _, _,
Seq(g@modular.GroupBy(_, _, _, _,
s2@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)), _, _, _, _)
if !s.rewritten && s2.children.forall { _.isInstanceOf[modular.LeafNode] } =>
val attrMap = AttributeMap(s2.outputList
.collect { case a: Alias if a.child.isInstanceOf[Attribute] => (a.toAttribute, a) })
cleanupQualifier(s, s2.aliasMap, s2.children, attrMap)
case g@modular.GroupBy(_, _, _, _,
s2@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)
if !g.rewritten && s2.children.forall { _.isInstanceOf[modular.LeafNode] } =>
val attrMap = AttributeMap(s2.outputList
.collect { case a: Alias if a.child.isInstanceOf[Attribute] => (a.toAttribute, a) })
cleanupQualifier(g, s2.aliasMap, s2.children, attrMap)
case s@modular.Select(_, _, _, _, _, _, _, _, _, _)
if !s.rewritten && s.children.forall { _.isInstanceOf[modular.LeafNode] } =>
cleanupQualifier(s, s.aliasMap, s.children, AttributeMap(Seq.empty[(Attribute, Alias)]))
}
}
private def cleanupQualifier(node: ModularPlan,
aliasMap: Map[Int, String],
children: Seq[ModularPlan],
attrMap: AttributeMap[Alias]): ModularPlan = {
node transform {
case plan if !plan.rewritten && !plan.isInstanceOf[modular.LeafNode] =>
plan.transformExpressions {
case ref: Attribute =>
val i = children.indexWhere(_.outputSet.contains(ref))
if (i > -1) {
// this is a walk around for mystery of spark qualifier
if (aliasMap.nonEmpty && aliasMap(i).nonEmpty) {
AttributeReference(
ref.name,
ref.dataType)(exprId = ref.exprId, qualifier = Option(aliasMap(i)))
} else {
ref
}
} else {
attrMap.get(ref) match {
case Some(alias) =>
AttributeReference(
alias.child.asInstanceOf[AttributeReference].name,
ref.dataType)(exprId = ref.exprId,
alias.child.asInstanceOf[AttributeReference].qualifier)
case None => ref
}
}
}
}
}
}
object AddSubquery extends Rule[ModularPlan] {
override def apply(tree: ModularPlan): ModularPlan = {
tree transformUp {
case s@modular.Select(_, _, _, _, _,
Seq(g@modular.GroupBy(_, _, _, _,
s2@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)), _, _, _, _)
if s2.children.forall { _.isInstanceOf[modular.LeafNode] } => s
case g@modular.GroupBy(_, _, _, _,
s2@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)
if s2.children.forall { _.isInstanceOf[modular.LeafNode] } => g
case s@modular.Select(_, _, _, _, _, _, _, _, _, _)
if !s.rewritten && !s.children.forall { _.isInstanceOf[modular.LeafNode] } =>
var list: List[(Int, String)] = List()
var newS = s.copy()
s.children.zipWithIndex.filterNot { _._1.isInstanceOf[modular.LeafNode] }.foreach {
case (child: ModularPlan, index) if (!s.aliasMap.contains(index)) =>
val subqueryName = newSubqueryName()
val windowAttributeSet = if (child.isInstanceOf[Select]) {
val windowExprs = child.asInstanceOf[Select].windowSpec
.map { case Seq(expr) => expr.asInstanceOf[Seq[NamedExpression]] }
.foldLeft(Seq.empty.asInstanceOf[Seq[NamedExpression]])(_ ++ _)
SQLBuilder.collectAttributeSet(windowExprs)
} else {
AttributeSet.empty
}
val subqueryAttributeSet = child.outputSet ++ windowAttributeSet
// TODO: how to use alias to avoid duplicate names with distinct
// exprIds
// val duplicateNames = collectDuplicateNames(subqueryAttributeSet)
// .toSet
if (SQLBuilder.collectDuplicateNames(subqueryAttributeSet).nonEmpty) {
new UnsupportedOperationException(s"duplicate name(s): ${
child.output
.map(_.toString + ", ")
}")
}
list = list :+ ((index, subqueryName))
newS = newS.transformExpressions {
case ref: Attribute if (subqueryAttributeSet.contains(ref)) =>
AttributeReference(ref.name, ref.dataType)(
exprId = ref.exprId,
qualifier = Some(subqueryName))
case alias: Alias if (subqueryAttributeSet.contains(alias.toAttribute)) =>
Alias(alias.child, alias.name)(
exprId = alias.exprId,
qualifier = Some(subqueryName))
}
case _ =>
}
if (list.nonEmpty) {
list = list ++ s.aliasMap.toSeq
newS.copy(aliasMap = list.toMap)
} else {
newS
}
case g@modular.GroupBy(_, _, _, _, _, _, _, _) if (!g.rewritten && g.alias.isEmpty) =>
val newG = if (g.outputList.isEmpty) {
val ol = g.predicateList.map { case a: Attribute => a }
g.copy(outputList = ol)
} else {
g
}
val subqueryName = newSubqueryName()
val subqueryAttributeSet = newG.child.outputSet
if (SQLBuilder.collectDuplicateNames(subqueryAttributeSet).nonEmpty) {
new UnsupportedOperationException(s"duplicate name(s): ${
newG.child.output.map(_.toString + ", ")
}")
}
newG.transformExpressions {
case ref: AttributeReference if (subqueryAttributeSet.contains(ref)) =>
AttributeReference(ref.name, ref.dataType)(
exprId = ref.exprId,
qualifier = Some(subqueryName))
case alias: Alias if (subqueryAttributeSet.contains(alias.toAttribute)) =>
Alias(alias.child, alias.name)(
exprId = alias.exprId,
qualifier = Some(subqueryName))
}.copy(alias = Some(subqueryName))
}
}
}
}
}
object SQLBuilder {
def collectAttributeSet(outputList: Seq[Expression]): AttributeSet = {
AttributeSet(outputList
.collect {
case a@Alias(_, _) => a.toAttribute
case a: Attribute => a
})
}
def collectDuplicateNames(s: AttributeSet): immutable.Iterable[String] = {
s.baseSet.map(_.a).groupBy(_.name).collect { case (x, ys) if ys.size > 1 => x }
}
def subqueryPreprocess(plan: ModularPlan, nextScalarSubqueryId: AtomicLong): ModularPlan = {
def newSubqueryName(nextScalarSubqueryId: AtomicLong): String = {
s"gen_expression_${
nextScalarSubqueryId
.getAndIncrement()
}_"
}
plan.transformAllExpressions {
case s: ModularSubquery =>
if (s.children.isEmpty) {
val subqueryName = newSubqueryName(nextScalarSubqueryId)
SubqueryHolder(s"(${ s.plan.asOneLineSQL(subqueryName) })")
} else {
throw new UnsupportedOperationException(s"Expression $s can't converted to SQL")
}
case o => o
}
}
def screenOutPlanWithSubquery(plan: ModularPlan): ModularPlan = {
plan.transformAllExpressions {
case s: ModularSubquery =>
throw new UnsupportedOperationException(s"Nested scala subquery $s doesn't supported")
case e => e
}
}
}