blob: 2918bf97891959afb213c793a9ba7c6d3c532ed0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.secondaryindex.optimizer
import org.apache.log4j.Logger
import org.apache.spark.sql.{CarbonUtils, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, PredicateHelper}
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand
import org.apache.spark.util.SparkUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
/**
* Rule for rewriting plan if query has a filter on index table column
*/
class CarbonSITransformationRule(sparkSession: SparkSession)
extends Rule[LogicalPlan] with PredicateHelper {
val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
val secondaryIndexOptimizer: CarbonSecondaryIndexOptimizer =
new CarbonSecondaryIndexOptimizer(sparkSession)
def apply(plan: LogicalPlan): LogicalPlan = {
if (checkIfRuleNeedToBeApplied(plan)) {
secondaryIndexOptimizer.transformFilterToJoin(plan, isProjectionNeeded(plan))
} else {
plan
}
}
private def checkIfRuleNeedToBeApplied(plan: LogicalPlan): Boolean = {
var isRuleNeedToBeApplied = false
val relations = CarbonUtils.collectCarbonRelation(plan)
val isCreateAsSelect = isCreateTableAsSelect(plan)
if (relations.nonEmpty && !isCreateAsSelect) {
plan.collect {
case join@Join(_, _, _, condition) =>
condition match {
case Some(x) =>
x match {
case equalTo: EqualTo =>
if (equalTo.left.isInstanceOf[AttributeReference] &&
equalTo.right.isInstanceOf[AttributeReference] &&
equalTo.left.asInstanceOf[AttributeReference].name.equalsIgnoreCase(
CarbonCommonConstants.POSITION_ID) &&
equalTo.right.asInstanceOf[AttributeReference].name.equalsIgnoreCase(
CarbonCommonConstants.POSITION_REFERENCE)) {
isRuleNeedToBeApplied = false
return isRuleNeedToBeApplied
} else {
return isRuleNeedToBeApplied
}
join
case _ =>
join
}
case _ =>
}
case _ =>
isRuleNeedToBeApplied = true
plan
}
}
isRuleNeedToBeApplied
}
/**
* Method to check whether the plan is for create/insert non carbon table(hive, parquet etc).
* In this case, transformed plan need to add the extra projection, as positionId and
* positionReference columns will also be added to the output of the plan irrespective of
* whether the query has requested these columns or not
*
* @param plan
* @return
*/
private def isProjectionNeeded(plan: LogicalPlan): Boolean = {
var needProjection = false
if (SparkUtil.isSparkVersionXandAbove("2.3")) {
plan collect {
case create: CreateHiveTableAsSelectCommand =>
needProjection = true
case CreateDataSourceTableAsSelectCommand(_, _, _, _) =>
needProjection = true
case create: LogicalPlan if (create.getClass.getSimpleName
.equals("OptimizedCreateHiveTableAsSelectCommand")) =>
needProjection = true
case insert: InsertIntoHadoopFsRelationCommand =>
if (!insert.fileFormat.toString.equals("carbon")) {
needProjection = true
}
}
}
needProjection
}
private def isCreateTableAsSelect(plan: LogicalPlan): Boolean = {
var isCreateTableAsSelectFlow = false
if (SparkUtil.isSparkVersionXandAbove("2.3")) {
plan collect {
case CreateHiveTableAsSelectCommand(_, _, _, _) =>
isCreateTableAsSelectFlow = true
case CreateDataSourceTableAsSelectCommand(_, _, _, _) =>
isCreateTableAsSelectFlow = true
case create: LogicalPlan if (create.getClass.getSimpleName
.equals("OptimizedCreateHiveTableAsSelectCommand")) =>
isCreateTableAsSelectFlow = true
}
}
isCreateTableAsSelectFlow
}
}