| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.mv.plans.util |
| |
| import org.apache.spark.sql.CarbonToSparkAdapter |
| import org.apache.spark.sql.catalyst.analysis._ |
| import org.apache.spark.sql.catalyst.expressions._ |
| import org.apache.spark.sql.catalyst.optimizer._ |
| import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _} |
| import org.apache.spark.sql.catalyst.rules.{RuleExecutor, _} |
| import org.apache.spark.sql.SQLConf |
| import org.apache.spark.sql.util.SparkSQLUtil |
| |
| object BirdcageOptimizer extends RuleExecutor[LogicalPlan] { |
| |
| val conf = new SQLConf() |
| |
| protected val fixedPoint = FixedPoint(conf.optimizerMaxIterations) |
| |
| def batches: Seq[Batch] = { |
| // Technically some of the rules in Finish Analysis are not optimizer rules and belong more |
| // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime). |
| // However, because we also use the analyzer to canonicalized queries (for view definition), |
| // we do not eliminate subqueries or compute current time in the analyzer. |
| Batch( |
| "Finish Analysis", Once, |
| EliminateSubqueryAliases, |
| SparkSQLUtil.getEliminateViewObj(), |
| ReplaceExpressions, |
| ComputeCurrentTime, |
| // GetCurrentDatabase(sessionCatalog), |
| RewriteDistinctAggregates, |
| ReplaceDeduplicateWithAggregate) :: |
| ////////////////////////////////////////////////////////////////////////////////////////// |
| // Optimizer rules start here |
| ////////////////////////////////////////////////////////////////////////////////////////// |
| // - Do the first call of CombineUnions before starting the major Optimizer rules, |
| // since it can reduce the number of iteration and the other rules could add/move |
| // extra operators between two adjacent Union operators. |
| // - Call CombineUnions again in Batch("Operator Optimizations"), |
| // since the other rules might make two separate Unions operators adjacent. |
| Batch( |
| "Union", Once, |
| CombineUnions) :: |
| Batch( |
| "Pullup Correlated Expressions", Once, |
| SparkSQLUtil.getPullupCorrelatedPredicatesObj()) :: |
| Batch( |
| "Subquery", Once, |
| OptimizeSubqueries) :: |
| Batch( |
| "Replace Operators", fixedPoint, |
| ReplaceIntersectWithSemiJoin, |
| ReplaceExceptWithAntiJoin, |
| ReplaceDistinctWithAggregate) :: |
| Batch( |
| "Aggregate", fixedPoint, |
| RemoveLiteralFromGroupExpressions, |
| RemoveRepetitionFromGroupExpressions) :: |
| Batch( |
| "Operator Optimizations", fixedPoint, Seq( |
| // Operator push down |
| PushProjectionThroughUnion, |
| SparkSQLUtil.getReorderJoinObj(conf), |
| SparkSQLUtil.getEliminateOuterJoinObj(conf), |
| PushPredicateThroughJoin, |
| PushDownPredicate, |
| // LimitPushDown(conf), |
| ColumnPruning, |
| // InferFiltersFromConstraints(conf), |
| // Operator combine |
| CollapseRepartition, |
| CollapseProject, |
| CollapseWindow, |
| CombineFilters, |
| CombineLimits, |
| CombineUnions, |
| // Constant folding and strength reduction |
| SparkSQLUtil.getNullPropagationObj(conf), |
| FoldablePropagation, |
| // OptimizeIn(conf), |
| ConstantFolding, |
| ReorderAssociativeOperator, |
| // No need to apply LikeSimplification rule while creating datamap |
| // as modular plan asCompactSql will be set in datamapschema |
| // LikeSimplification, |
| BooleanSimplification, |
| SimplifyConditionals, |
| RemoveDispensableExpressions, |
| SimplifyBinaryComparison, |
| // PruneFilters(conf), |
| EliminateSorts, |
| SimplifyCasts, |
| SimplifyCaseConversionExpressions, |
| RewriteCorrelatedScalarSubquery, |
| EliminateSerialization, |
| SparkSQLUtil.getRemoveRedundantAliasesObj(), |
| RemoveRedundantProject, |
| SimplifyCreateStructOps, |
| SimplifyCreateArrayOps, |
| SimplifyCreateMapOps) ++ |
| extendedOperatorOptimizationRules: _*) :: |
| Batch( |
| "Check Cartesian Products", Once, |
| SparkSQLUtil.getCheckCartesianProductsObj(conf)) :: |
| // Batch("Join Reorder", Once, |
| // CostBasedJoinReorder(conf)) :: |
| // Batch("Decimal Optimizations", fixedPoint, |
| // DecimalAggregates(conf)) :: |
| Batch( |
| "Object Expressions Optimization", fixedPoint, |
| EliminateMapObjects, |
| CombineTypedFilters) :: |
| // Batch("LocalRelation", fixedPoint, |
| // ConvertToLocalRelation, |
| // PropagateEmptyRelation) :: |
| Batch( |
| "OptimizeCodegen", Once, CarbonToSparkAdapter.getOptimizeCodegenRule(conf): _*) :: |
| Batch( |
| "RewriteSubquery", Once, |
| RewritePredicateSubquery, |
| CollapseProject) :: Nil |
| } |
| |
| /** |
| * Optimize all the subqueries inside expression. |
| */ |
| object OptimizeSubqueries extends Rule[LogicalPlan] { |
| def apply(plan: LogicalPlan): LogicalPlan = { |
| plan transformAllExpressions { |
| case s: SubqueryExpression => |
| val Subquery(newPlan) = BirdcageOptimizer.this.execute(Subquery(s.plan)) |
| s.withNewPlan(newPlan) |
| } |
| } |
| } |
| |
| /** |
| * Override to provide additional rules for the operator optimization batch. |
| */ |
| def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = { |
| Nil |
| } |
| } |
| |
| /** |
| * Push Aggregate through join to fact table. |
| * Pushes down [[Aggregate]] operators where the `grouping` and `aggregate` expressions can |
| * be evaluated using only the attributes of the fact table, the left or right side of a |
| * star-join. |
| * Other [[Aggregate]] expressions stay in the original [[Aggregate]]. |
| * |
| * Check 'Aggregate Pushdown Over Join: Design & Preliminary Results' by LiTao for more details |
| */ |
| // case class PushAggregateThroughJoin(conf: SQLConf) extends Rule[LogicalPlan] with |
| // PredicateHelper { |
| // |
| // val tableCluster = { |
| // val mapper = new ObjectMapper().registerModule(DefaultScalaModule) |
| // val tableClusterString = conf.getConfString("spark.mv.tableCluster") |
| // mapper.readValue(tableClusterString, classOf[TableCluster]) |
| // } |
| // |
| // def apply(plan: LogicalPlan): LogicalPlan = plan transform { |
| // |
| // // Push down aggregate expressions through Join |
| // case a @ Aggregate(grouping, aggregate, Project(projectList, Join(left, right, jt, cond))) |
| // if (left.isInstanceOf[LeafNode] && => { |
| // val fTables: Set[String] = tableCluster.getFact |
| // val dTables: Set[String] = tableCluster.getDimension |
| // // if canPushThrough(left,a) |
| // |
| // if (fTables.contains(s"${left.databaseName}.${left.tableName}") |
| // Aggregate(newGrouping, newAggregate, Project(projectList, Join(Aggregate(_,_,Project |
| // (projectList1, left)), right, jt, cond))) |
| // } |
| // } |
| // |
| // private def canPushThrough(join: Join): Boolean = join match { |
| // case Join(left : LeafNode, right: LeafNode, Inner, EqualTo(l: AttributeReference, |
| // r: AttributeReference)) => true |
| // |
| // |
| // } |
| // |
| // |
| // } |