| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.mv.plans.util |
| |
| import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation} |
| import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, NamedExpression, PredicateHelper} |
| import org.apache.spark.sql.catalyst.plans._ |
| import org.apache.spark.sql.catalyst.plans.logical._ |
| import org.apache.spark.sql.execution.datasources.LogicalRelation |
| import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil} |
| |
| import org.apache.carbondata.mv.plans.modular.Flags._ |
| import org.apache.carbondata.mv.plans.modular.JoinEdge |
| |
| |
| |
| /** |
| * SelectModule is extracted from logical plan of SPJG query. All join conditions |
| * filter, and project operators in a single Aggregate-less subtree of logical plan |
| * are collected. |
| * |
| * The returned values for this match are as follows: |
| * - Conditions for equi-join |
| * - Conditions for filter |
| * - Project list for project |
| * |
| */ |
| object ExtractSelectModule extends PredicateHelper { |
| type ReturnType = (Seq[NamedExpression], Seq[Expression], Seq[Expression], Map[Int, String], |
| Seq[JoinEdge], Seq[LogicalPlan], FlagSet, Seq[Seq[Any]], Seq[Seq[Any]]) |
| |
| def unapply(plan: LogicalPlan): Option[ReturnType] = { |
| val (outputs, inputs, predicates, joinedges, children, isSelect, _, flags, fspecs, wspecs) = |
| collectProjectsFiltersJoinsAndSort(plan) |
| if (!isSelect) { |
| None |
| } else { |
| Some( |
| outputs, |
| inputs, |
| predicates, |
| collectChildAliasMappings( |
| AttributeSet(outputs).toSeq ++ AttributeSet(predicates).toSeq, |
| children), |
| joinedges, |
| children, |
| flags, |
| fspecs, |
| wspecs) |
| } |
| } |
| |
| def collectProjectsFiltersJoinsAndSort(plan: LogicalPlan): (Seq[NamedExpression], |
| Seq[Expression], Seq[Expression], Seq[JoinEdge], Seq[LogicalPlan], Boolean, Map[Attribute, |
| Expression], FlagSet, Seq[Seq[Any]], Seq[Seq[Any]]) = { |
| plan match { |
| case Project(fields, child) => |
| val (_, inputs, predicates, joinedges, children, _, aliases, flags, fspecs, wspecs) = |
| collectProjectsFiltersJoinsAndSort(child) |
| val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]] |
| (substitutedFields, inputs, predicates, joinedges, children, true, collectAliases( |
| substitutedFields), flags, fspecs, wspecs) |
| |
| case Filter(condition, child) => |
| val (outputs, inputs, predicates, joinedges, children, _, aliases, flags, fspecs, wspecs) |
| = collectProjectsFiltersJoinsAndSort(child) |
| val substitutedCondition = substitute(aliases)(condition) |
| (outputs, inputs, predicates.flatMap(splitConjunctivePredicates) ++ |
| splitConjunctivePredicates(substitutedCondition), joinedges, children, |
| true, aliases, flags, fspecs, wspecs) |
| |
| case Sort(order, global, child) => |
| val (outputs, inputs, predicates, joinedges, children, _, aliases, flags, fspecs, wspecs) |
| = collectProjectsFiltersJoinsAndSort(child) |
| val substitutedOrder = order.map(substitute(aliases)) |
| (outputs, inputs, predicates, joinedges, children, true, aliases, if (global) { |
| flags.setFlag(SORT).setFlag(GLOBAL) |
| } else { |
| flags.setFlag(SORT) |
| }, Seq(Seq(order)) ++ fspecs, wspecs) |
| |
| case Join(left, right, joinType, condition) => |
| val (loutputs, linputs, lpredicates, ljoinedges, lchildren, _, laliases, lflags, lfspecs, |
| lwspecs) = collectProjectsFiltersJoinsAndSort(left) |
| val (routputs, rinputs, rpredicates, rjoinedges, rchildren, _, raliases, rflags, rfspecs, |
| rwspecs) = collectProjectsFiltersJoinsAndSort(right) |
| val (lcondition, rcondition, ccondition) = split(condition, lchildren, rchildren) |
| val joinEdge = collectJoinEdge(ccondition, lchildren, rchildren, joinType) |
| val adjustedJoinEdges = rjoinedges |
| .map(e => JoinEdge(e.left + lchildren.size, e.right + lchildren.size, e.joinType)) |
| val output: Seq[Attribute] = { |
| joinType match { |
| case LeftSemi => |
| left.output |
| case LeftOuter => |
| left.output ++ right.output.map(_.withNullability(true)) |
| case RightOuter => |
| left.output.map(_.withNullability(true)) ++ right.output |
| case FullOuter => |
| left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) |
| case LeftAnti => |
| left.output |
| case _ => |
| left.output ++ right.output |
| } |
| } |
| if (lfspecs.isEmpty && rfspecs.isEmpty && lflags == NoFlags && rflags == NoFlags && |
| lwspecs.isEmpty && rwspecs.isEmpty) { |
| (output, (linputs ++ rinputs), lpredicates.flatMap(splitConjunctivePredicates) ++ |
| rpredicates.flatMap(splitConjunctivePredicates) ++ |
| lcondition ++ rcondition ++ ccondition, ljoinedges ++ |
| joinEdge ++ |
| adjustedJoinEdges, |
| lchildren ++ rchildren, true, laliases ++ raliases, NoFlags, Seq.empty, Seq.empty) |
| } else { |
| throw new UnsupportedOperationException( |
| s"unsupported join: \n left child ${ left } " + |
| s"\n right child ${ right }") |
| } |
| |
| // when select * is executed with limit, ColumnPruning rule will remove the project node from |
| // the plan during optimization, so if child of Limit is relation, then make the select node |
| // and make the modular plan |
| case Limit(limitExpr, lr: LogicalRelation) => |
| (lr.output, lr.output, Nil, Nil, Seq(lr), true, Map.empty, NoFlags, Seq.empty, Seq |
| .empty) |
| |
| case other => |
| (other.output, other.output, Nil, Nil, Seq(other), false, Map.empty, NoFlags, Seq.empty, Seq |
| .empty) |
| } |
| } |
| |
| def collectAliases(fields: Seq[Expression]): Map[Attribute, Expression] = { |
| fields.collect { |
| case a@Alias(child, _) => a.toAttribute -> child |
| }.toMap |
| } |
| |
| def substitute(aliases: Map[Attribute, Expression])(expr: Expression): Expression = { |
| expr.transform { |
| case a@Alias(ref: AttributeReference, name) => |
| aliases.get(ref).map(Alias(_, name)(a.exprId, a.qualifier)).getOrElse(a) |
| |
| case a: AttributeReference => |
| aliases.get(a).map(Alias(_, a.name)(a.exprId, a.qualifier)).getOrElse(a) |
| } |
| } |
| |
| def collectChildAliasMappings(attributeSet: Seq[Attribute], children: Seq[LogicalPlan] |
| ): Map[Int, String] = { |
| val aq = attributeSet.filter(_.qualifier.nonEmpty) |
| children.zipWithIndex.flatMap { |
| case (child, i) => |
| aq.find(child.outputSet.contains(_)).map(_.qualifier).flatten.map((i, _)) |
| }.toMap |
| } |
| |
| def split(condition: Option[Expression], |
| lchildren: Seq[LogicalPlan], |
| rchildren: Seq[LogicalPlan]): (Seq[Expression], Seq[Expression], Seq[Expression]) = { |
| val left = lchildren.map(_.outputSet).foldLeft(AttributeSet(Set.empty))(_ ++ _) |
| val right = rchildren.map(_.outputSet).foldLeft(AttributeSet(Set.empty))(_ ++ _) |
| val conditions = condition.map(splitConjunctivePredicates).getOrElse(Nil) |
| val (leftEvaluationCondition, rest) = conditions.partition(_.references subsetOf left) |
| val (rightEvaluationCondition, commonCondition) = rest.partition(_.references subsetOf right) |
| (leftEvaluationCondition, rightEvaluationCondition, commonCondition) |
| } |
| |
| /* |
| * collectJoinEdge only valid when condition are common condition of above split, left and |
| * right children correspond |
| * to respective two children parameters of above split |
| * |
| */ |
| def collectJoinEdge(condition: Seq[Expression], |
| lchildren: Seq[LogicalPlan], |
| rchildren: Seq[LogicalPlan], |
| joinType: JoinType): Seq[JoinEdge] = { |
| val common = condition.map(_.references).foldLeft(AttributeSet(Set.empty))(_ ++ _) |
| val lIdxSeq = lchildren |
| .collect { case x if x.outputSet.intersect(common).nonEmpty => lchildren.indexOf(x) } |
| val rIdxSeq = rchildren |
| .collect { case x if x.outputSet.intersect(common).nonEmpty => rchildren.indexOf(x) + |
| lchildren.size |
| } |
| for (l <- lIdxSeq; r <- rIdxSeq) yield { |
| JoinEdge(l, r, joinType) |
| } |
| } |
| |
| } |
| |
| object ExtractSelectModuleForWindow extends PredicateHelper { |
| type ReturnType = (Seq[NamedExpression], Seq[Expression], Seq[Expression], Map[Int, String], |
| Seq[JoinEdge], Seq[LogicalPlan], FlagSet, Seq[Seq[Any]], Seq[Seq[Any]]) |
| |
| def unapply(plan: LogicalPlan): Option[ReturnType] = { |
| collectSelectFromWindowChild(plan) |
| } |
| |
| def collectSelectFromWindowChild(plan: LogicalPlan): Option[(Seq[NamedExpression], |
| Seq[Expression], Seq[Expression], Map[Int, String], Seq[JoinEdge], Seq[LogicalPlan], FlagSet, |
| Seq[Seq[Any]], Seq[Seq[Any]])] = { |
| plan match { |
| case agg@Aggregate(_, _, _) => |
| Some( |
| agg.aggregateExpressions, |
| agg.child.output, |
| Seq.empty, |
| Map.empty, |
| Seq.empty, |
| Seq(agg), |
| NoFlags, |
| Seq.empty, |
| Seq.empty) |
| case ExtractSelectModule( |
| output, |
| input, |
| predicate, |
| aliasmap, |
| joinedge, |
| children, |
| flags, |
| fspec, |
| wspec) => |
| Some(output, input, predicate, aliasmap, joinedge, children, flags, fspec, wspec) |
| case Window(exprs, _, _, child) => |
| val ret: Option[(Seq[NamedExpression], Seq[Expression], Seq[Expression], Map[Int, |
| String], Seq[JoinEdge], Seq[LogicalPlan], FlagSet, Seq[Seq[Any]], Seq[Seq[Any]])] = |
| collectSelectFromWindowChild( |
| child) |
| ret.map(r => (r._1, r._2, r._3, r._4, r._5, r._6, r._7, r._8, Seq(Seq(exprs)) ++ r._9)) |
| case other => None |
| } |
| } |
| } |
| |
| /** |
| * GroupByModule is extracted from the Aggregate node of logical plan. |
| * The groupingExpressions, aggregateExpressions are collected. |
| * |
| * The returned values for this match are as follows: |
| * - Grouping attributes for the Aggregate node. |
| * - Aggregates for the Aggregate node. |
| * - Project list for project |
| * |
| */ |
| |
| object ExtractGroupByModule extends PredicateHelper { |
| type ReturnType = (Seq[NamedExpression], Seq[Expression], Seq[Expression], Option[String], |
| LogicalPlan, FlagSet, Seq[Seq[Any]]) |
| |
| def unapply(plan: LogicalPlan): Option[ReturnType] = { |
| plan match { |
| case a@logical.Aggregate(_, _, e@Expand(_, _, p: Project)) if isGroupingSet(a, e, p) => |
| // Assumption: Aggregate's groupingExpressions is composed of |
| // 1) the grouping attributes |
| // 2) gid, which is always the last one |
| val g = a.groupingExpressions.map(_.asInstanceOf[Attribute]) |
| val numOriginalOutput = e.output.size - g.size |
| Some( |
| a.aggregateExpressions, |
| e.output, |
| a.groupingExpressions, |
| None, |
| p, |
| NoFlags.setFlag(EXPAND), |
| Seq(Seq(e.projections, e.output, numOriginalOutput))) |
| case logical.Aggregate(groupingExpressions, aggregateExpressions, child) => |
| Some( |
| aggregateExpressions, |
| child.output, |
| groupingExpressions, |
| None, |
| child, |
| NoFlags, |
| Seq.empty) |
| case other => None |
| } |
| } |
| |
| private def isGroupingSet(a: Aggregate, e: Expand, p: Project): Boolean = { |
| assert(a.child == e && e.child == p) |
| |
| if (a.groupingExpressions.forall(_.isInstanceOf[Attribute])) { |
| val g = a.groupingExpressions.map(_.asInstanceOf[Attribute]) |
| sameOutput( |
| e.output.drop(e.output.size - g.size), |
| a.groupingExpressions.map(_.asInstanceOf[Attribute])) |
| } else { |
| false |
| } |
| } |
| |
| private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean = { |
| output1.size == output2.size && |
| output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2)) |
| } |
| } |
| |
| object ExtractUnionModule extends PredicateHelper { |
| type ReturnType = (Seq[LogicalPlan], FlagSet, Seq[Seq[Any]]) |
| |
| def unapply(plan: LogicalPlan): Option[ReturnType] = { |
| plan match { |
| case u: Union => |
| val children = collectUnionChildren(u) |
| Some(children, NoFlags, Seq.empty) |
| case _ => None |
| } |
| } |
| |
| private def collectUnionChildren(plan: LogicalPlan): List[LogicalPlan] = { |
| plan match { |
| case Union(children) => children.toList match { |
| case head :: Nil => collectUnionChildren(head) |
| case head :: tail => collectUnionChildren(head) ++ collectUnionChildren(Union(tail)) |
| case Nil => Nil |
| } |
| case other => other :: Nil |
| } |
| } |
| } |
| |
| object ExtractTableModule extends PredicateHelper { |
| type ReturnType = (String, String, Seq[NamedExpression], Seq[LogicalPlan], FlagSet, Seq[Seq[Any]]) |
| |
| def unapply(plan: LogicalPlan): Option[ReturnType] = { |
| plan match { |
| // uncomment for cloudera1 version |
| // case m: CatalogRelation => |
| // Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags, |
| // Seq.empty) |
| // uncomment for apache version |
| case m: HiveTableRelation => |
| Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags, |
| Seq.empty) |
| case l: LogicalRelation => |
| val tableIdentifier = l.catalogTable.map(_.identifier) |
| val database = tableIdentifier.map(_.database).flatten.getOrElse(null) |
| val table = tableIdentifier.map(_.table).getOrElse(null) |
| Some(database, table, l.output, Nil, NoFlags, Seq.empty) |
| case l: LocalRelation => // used for unit test |
| Some(null, null, l.output, Nil, NoFlags, Seq.empty) |
| case _ => |
| // this check is added as we get MetastoreRelation in spark2.1, |
| // this is removed in later spark version |
| // TODO: this check can be removed once 2.1 support is removed from carbon |
| if (SparkUtil.isSparkVersionEqualTo("2.1") && |
| plan.getClass.getName.equals("org.apache.spark.sql.hive.MetastoreRelation")) { |
| val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable("catalogTable", plan) |
| .asInstanceOf[CatalogTable] |
| Some(catalogTable.database, |
| catalogTable.identifier.table, |
| plan.output, |
| Nil, |
| NoFlags, |
| Seq.empty) |
| } else { |
| None |
| } |
| } |
| } |
| } |