blob: 2765c5f0da28c66666cc5053154a939ba42d3ba1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.optimizer
import java.util
import scala.collection.JavaConverters._
import scala.util.Try
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.CastExpressionOptimization
import org.apache.spark.sql.types._
import org.apache.spark.sql.CarbonContainsWith
import org.apache.spark.sql.CarbonEndsWith
import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.CarbonSessionCatalog
import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression, MatchExpression}
import org.apache.carbondata.core.scan.expression.conditional._
import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
import org.apache.carbondata.core.scan.filter.intf.ExpressionType
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.ThreadLocalSessionInfo
import org.apache.carbondata.datamap.{TextMatch, TextMatchLimit}
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
/**
* All filter conversions are done here.
*/
object CarbonFilters {
val carbonProperties = CarbonProperties.getInstance()
/**
* Converts data sources filters to carbon filter predicates.
*/
def createCarbonFilter(schema: StructType,
predicate: sources.Filter): Option[CarbonExpression] = {
val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
predicate match {
case sources.EqualTo(name, value) =>
Some(new EqualToExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, value)))
case sources.Not(sources.EqualTo(name, value)) =>
Some(new NotEqualsExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, value)))
case sources.EqualNullSafe(name, value) =>
Some(new EqualToExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, value)))
case sources.Not(sources.EqualNullSafe(name, value)) =>
Some(new NotEqualsExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, value)))
case sources.GreaterThan(name, value) =>
Some(new GreaterThanExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, value)))
case sources.LessThan(name, value) =>
Some(new LessThanExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, value)))
case sources.GreaterThanOrEqual(name, value) =>
Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, value)))
case sources.LessThanOrEqual(name, value) =>
Some(new LessThanEqualToExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, value)))
case sources.In(name, values) =>
if (values.length == 1 && values(0) == null) {
Some(new FalseExpression(getCarbonExpression(name)))
} else {
Some(new InExpression(getCarbonExpression(name),
new ListExpression(
convertToJavaList(values.filterNot(_ == null)
.map(filterValues => getCarbonLiteralExpression(name, filterValues)).toList))))
}
case sources.Not(sources.In(name, values)) =>
if (values.contains(null)) {
Some(new FalseExpression(getCarbonExpression(name)))
} else {
Some(new NotInExpression(getCarbonExpression(name),
new ListExpression(
convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
}
case sources.IsNull(name) =>
Some(new EqualToExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, null), true))
case sources.IsNotNull(name) =>
Some(new NotEqualsExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, null), true))
case sources.And(lhs, rhs) =>
(createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
case sources.Or(lhs, rhs) =>
for {
lhsFilter <- createFilter(lhs)
rhsFilter <- createFilter(rhs)
} yield {
new OrExpression(lhsFilter, rhsFilter)
}
case sources.StringStartsWith(name, value) if value.length > 0 =>
Some(new StartsWithExpression(getCarbonExpression(name),
getCarbonLiteralExpression(name, value)))
case CarbonEndsWith(expr: Expression) =>
Some(new SparkUnknownExpression(expr.transform {
case AttributeReference(name, dataType, _, _) =>
CarbonBoundReference(new CarbonColumnExpression(name.toString,
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataType)),
dataType, expr.nullable)
}, ExpressionType.ENDSWITH))
case CarbonContainsWith(expr: Expression) =>
Some(new SparkUnknownExpression(expr.transform {
case AttributeReference(name, dataType, _, _) =>
CarbonBoundReference(new CarbonColumnExpression(name.toString,
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataType)),
dataType, expr.nullable)
}, ExpressionType.CONTAINSWITH))
case CastExpr(expr: Expression) =>
Some(transformExpression(expr))
case FalseExpr() =>
Some(new FalseExpression(null))
case TextMatch(queryString) =>
Some(new MatchExpression(queryString))
case TextMatchLimit(queryString, maxDoc) =>
Some(new MatchExpression(queryString, Try(maxDoc.toInt).getOrElse(Integer.MAX_VALUE)))
case _ => None
}
}
def getCarbonExpression(name: String) = {
new CarbonColumnExpression(name,
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
}
def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = {
val dataTypeOfAttribute =
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name))
val dataType = if (Option(value).isDefined
&& dataTypeOfAttribute == CarbonDataTypes.STRING
&& value.isInstanceOf[Double]) {
CarbonDataTypes.DOUBLE
} else {
dataTypeOfAttribute
}
val dataValue = if (dataTypeOfAttribute.equals(CarbonDataTypes.BINARY)
&& Option(value).isDefined) {
new String(value.asInstanceOf[Array[Byte]])
} else {
value
}
new CarbonLiteralExpression(dataValue, dataType)
}
createFilter(predicate)
}
// Check out which filters can be pushed down to carbon, remaining can be handled in spark layer.
// Mostly dimension filters are only pushed down since it is faster in carbon.
// TODO - The Filters are first converted Intermediate sources filters expression and then these
// expressions are again converted back to CarbonExpression. Instead of two step process of
// evaluating the filters it can be merged into a single one.
def selectFilters(filters: Seq[Expression],
attrList: java.util.HashSet[AttributeReferenceWrapper],
aliasMap: CarbonAliasDecoderRelation): Unit = {
def translate(expr: Expression, or: Boolean = false): Option[sources.Filter] = {
expr match {
case or@Or(left, right) =>
val leftFilter = translate(left, or = true)
val rightFilter = translate(right, or = true)
if (leftFilter.isDefined && rightFilter.isDefined) {
Some(sources.Or(leftFilter.get, rightFilter.get))
} else {
or.collect {
case attr: AttributeReference =>
attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
}
None
}
case And(left, right) =>
val leftFilter = translate(left, or)
val rightFilter = translate(right, or)
if (or) {
if (leftFilter.isDefined && rightFilter.isDefined) {
(leftFilter ++ rightFilter).reduceOption(sources.And)
} else {
None
}
} else {
(leftFilter ++ rightFilter).reduceOption(sources.And)
}
case EqualTo(a: Attribute, Literal(v, t)) =>
Some(sources.EqualTo(a.name, v))
case EqualTo(l@Literal(v, t), a: Attribute) =>
Some(sources.EqualTo(a.name, v))
case c@EqualTo(Cast(a: Attribute, _), Literal(v, t)) =>
CastExpressionOptimization.checkIfCastCanBeRemove(c)
case c@EqualTo(Literal(v, t), Cast(a: Attribute, _)) =>
CastExpressionOptimization.checkIfCastCanBeRemove(c)
case Not(EqualTo(a: Attribute, Literal(v, t))) =>
Some(sources.Not(sources.EqualTo(a.name, v)))
case Not(EqualTo(Literal(v, t), a: Attribute)) =>
Some(sources.Not(sources.EqualTo(a.name, v)))
case c@Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) =>
CastExpressionOptimization.checkIfCastCanBeRemove(c)
case c@Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) =>
CastExpressionOptimization.checkIfCastCanBeRemove(c)
case IsNotNull(a: Attribute) =>
Some(sources.IsNotNull(a.name))
case IsNull(a: Attribute) =>
Some(sources.IsNull(a.name))
case Not(In(a: Attribute, list)) if !list.exists(!_.isInstanceOf[Literal]) =>
val hSet = list.map(e => e.eval(EmptyRow))
Some(sources.Not(sources.In(a.name, hSet.toArray)))
case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
val hSet = list.map(e => e.eval(EmptyRow))
Some(sources.In(a.name, hSet.toArray))
case c@Not(In(Cast(a: Attribute, _), list)) if !list.exists(!_.isInstanceOf[Literal]) =>
Some(CastExpr(c))
case c@In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
Some(CastExpr(c))
case InSet(a: Attribute, set) =>
Some(sources.In(a.name, set.toArray))
case Not(InSet(a: Attribute, set)) =>
Some(sources.Not(sources.In(a.name, set.toArray)))
case GreaterThan(a: Attribute, Literal(v, t)) =>
Some(sources.GreaterThan(a.name, v))
case GreaterThan(Literal(v, t), a: Attribute) =>
Some(sources.LessThan(a.name, v))
case c@GreaterThan(Cast(a: Attribute, _), Literal(v, t)) =>
CastExpressionOptimization.checkIfCastCanBeRemove(c)
case c@GreaterThan(Literal(v, t), Cast(a: Attribute, _)) =>
CastExpressionOptimization.checkIfCastCanBeRemove(c)
case LessThan(a: Attribute, Literal(v, t)) =>
Some(sources.LessThan(a.name, v))
case LessThan(Literal(v, t), a: Attribute) =>
Some(sources.GreaterThan(a.name, v))
case c@LessThan(Cast(a: Attribute, _), Literal(v, t)) =>
CastExpressionOptimization.checkIfCastCanBeRemove(c)
case c@LessThan(Literal(v, t), Cast(a: Attribute, _)) =>
CastExpressionOptimization.checkIfCastCanBeRemove(c)
case GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
Some(sources.GreaterThanOrEqual(a.name, v))
case GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
Some(sources.LessThanOrEqual(a.name, v))
case c@GreaterThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
CastExpressionOptimization.checkIfCastCanBeRemove(c)
case c@GreaterThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
CastExpressionOptimization.checkIfCastCanBeRemove(c)
case LessThanOrEqual(a: Attribute, Literal(v, t)) =>
Some(sources.LessThanOrEqual(a.name, v))
case LessThanOrEqual(Literal(v, t), a: Attribute) =>
Some(sources.GreaterThanOrEqual(a.name, v))
case c@LessThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
CastExpressionOptimization.checkIfCastCanBeRemove(c)
case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
CastExpressionOptimization.checkIfCastCanBeRemove(c)
case s@StartsWith(a: Attribute, Literal(v, t)) =>
Some(sources.StringStartsWith(a.name, v.toString))
case c@EndsWith(a: Attribute, Literal(v, t)) =>
Some(CarbonEndsWith(c))
case c@Contains(a: Attribute, Literal(v, t)) =>
Some(CarbonContainsWith(c))
case c@Cast(a: Attribute, _) =>
Some(CastExpr(c))
case c@Literal(v, t) if v == null =>
Some(FalseExpr())
case others =>
if (!or) {
others.collect {
case attr: AttributeReference =>
attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
}
}
None
}
}
filters.flatMap(translate(_, false)).toArray
}
/**
* This API checks whether StringTrim object is compatible with
* carbon,carbon only deals with the space any other symbol should
* be ignored.So condition is SPARK version < 2.3.
* If it is 2.3 then trimStr field should be empty
*
* @param stringTrim
* @return
*/
def isStringTrimCompatibleWithCarbon(stringTrim: StringTrim): Boolean = {
var isCompatible = true
if (SparkUtil.isSparkVersionXandAbove("2.3")) {
val trimStr = CarbonReflectionUtils.getField("trimStr", stringTrim)
.asInstanceOf[Option[Expression]]
if (trimStr.isDefined) {
isCompatible = false
}
}
isCompatible
}
def transformExpression(expr: Expression): CarbonExpression = {
expr match {
case Or(left, right)
if (isCarbonSupportedDataTypes(left) && isCarbonSupportedDataTypes(right)) => new
OrExpression(transformExpression(left), transformExpression(right))
case And(left, right)
if (isCarbonSupportedDataTypes(left) && isCarbonSupportedDataTypes(right)) => new
AndExpression(transformExpression(left), transformExpression(right))
case EqualTo(left, right)
if (isCarbonSupportedDataTypes(left) && isCarbonSupportedDataTypes(right)) => new
EqualToExpression(transformExpression(left), transformExpression(right))
case Not(EqualTo(left, right))
if (isCarbonSupportedDataTypes(left) && isCarbonSupportedDataTypes(right)) => new
NotEqualsExpression(transformExpression(left), transformExpression(right))
case IsNotNull(child)
if (isCarbonSupportedDataTypes(child)) => new
NotEqualsExpression(transformExpression(child), transformExpression(Literal(null)), true)
case IsNull(child)
if (isCarbonSupportedDataTypes(child)) => new
EqualToExpression(transformExpression(child), transformExpression(Literal(null)), true)
case Not(In(left, right)) if (isCarbonSupportedDataTypes(left)) =>
if (right.contains(null)) {
new FalseExpression(transformExpression(left))
}
else {
new NotInExpression(transformExpression(left),
new ListExpression(convertToJavaList(right.map(transformExpression)))
)
}
case In(left, right) if (isCarbonSupportedDataTypes(left)) =>
left match {
case left: AttributeReference if (left.name
.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID)) =>
new InExpression(transformExpression(left),
new ImplicitExpression(convertToJavaList(right.filter(_ != null)
.filter(!isNullLiteral(_))
.map(transformExpression))))
case _ =>
new InExpression(transformExpression(left),
new ListExpression(convertToJavaList(right.filter(_ != null).filter(!isNullLiteral(_))
.map(transformExpression))))
}
case InSet(left, right) if (isCarbonSupportedDataTypes(left)) =>
val validData = right.filter(_ != null).map { x =>
val e = Literal(x.toString)
transformExpression(e)
}.toList
new InExpression(transformExpression(left),
new ListExpression(convertToJavaList(validData)))
case Not(InSet(left, right)) if (isCarbonSupportedDataTypes(left)) =>
if (right.contains(null)) {
new FalseExpression(transformExpression(left))
}
else {
val r = right.map { x =>
val strVal = if (null == x) {
x
} else {
x.toString
}
val e = Literal(strVal)
transformExpression(e)
}.toList
new NotInExpression(transformExpression(left), new ListExpression(convertToJavaList(r)))
}
case GreaterThan(left, right)
if (isCarbonSupportedDataTypes(left) && isCarbonSupportedDataTypes(right)) =>
new GreaterThanExpression(transformExpression(left), transformExpression(right))
case GreaterThanOrEqual(left, right)
if (isCarbonSupportedDataTypes(left) && isCarbonSupportedDataTypes(right)) =>
new GreaterThanEqualToExpression(transformExpression(left), transformExpression(right))
case LessThan(left, right)
if (isCarbonSupportedDataTypes(left) && isCarbonSupportedDataTypes(right)) =>
new LessThanExpression(transformExpression(left), transformExpression(right))
case LessThanOrEqual(left, right)
if (isCarbonSupportedDataTypes(left) && isCarbonSupportedDataTypes(right)) =>
new LessThanEqualToExpression(transformExpression(left), transformExpression(right))
case AttributeReference(name, dataType, _, _) =>
new CarbonColumnExpression(name.toString,
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataType))
case Literal(name, dataType) => new
CarbonLiteralExpression(name,
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataType))
case StartsWith(left, right@Literal(pattern, dataType)) if pattern.toString.size > 0 &&
isCarbonSupportedDataTypes(left) &&
isCarbonSupportedDataTypes
(right) =>
val l = new GreaterThanEqualToExpression(transformExpression(left),
transformExpression(right))
val maxValueLimit = pattern.toString.substring(0, pattern.toString.length - 1) +
(pattern.toString.charAt(pattern.toString.length - 1).toInt + 1)
.toChar
val r = new LessThanExpression(
transformExpression(left),
new CarbonLiteralExpression(maxValueLimit,
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataType)))
new AndExpression(l, r)
case strTrim: StringTrim if isStringTrimCompatibleWithCarbon(strTrim) =>
transformExpression(strTrim)
case _ =>
new SparkUnknownExpression(expr.transform {
case AttributeReference(name, dataType, _, _) =>
CarbonBoundReference(new CarbonColumnExpression(name.toString,
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataType)),
dataType, expr.nullable)
}
)
}
}
private def isNullLiteral(exp: Expression): Boolean = {
if (null != exp
&& exp.isInstanceOf[Literal]
&& (exp.asInstanceOf[Literal].dataType == org.apache.spark.sql.types.DataTypes.NullType)
|| (exp.asInstanceOf[Literal].value == null)) {
true
} else {
false
}
}
def isCarbonSupportedDataTypes(expr: Expression): Boolean = {
expr.dataType match {
case StringType => true
case IntegerType => true
case LongType => true
case DoubleType => true
case FloatType => true
case BooleanType => true
case TimestampType => true
case ArrayType(_, _) => true
case StructType(_) => true
case MapType(_, _, _) => true
case DecimalType() => true
case _ => false
}
}
// Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is
// not able find the classes inside scala list and gives ClassNotFoundException.
private def convertToJavaList(
scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = {
CarbonSparkDataSourceUtil.convertToJavaList(scalaList)
}
def preProcessExpressions(expressions: Seq[Expression]): Seq[Expression] = {
expressions match {
case left :: right :: rest => preProcessExpressions(List(And(left, right)) ::: rest)
case List(left, right) => List(And(left, right))
case _ => expressions
}
}
def getCurrentPartitions(sparkSession: SparkSession,
tableIdentifier: TableIdentifier): Option[Seq[PartitionSpec]] = {
CarbonFilters.getPartitions(
Seq.empty,
sparkSession,
tableIdentifier)
}
def getCurrentPartitions(sparkSession: SparkSession,
carbonTable: CarbonTable): Option[Seq[PartitionSpec]] = {
CarbonFilters.getPartitions(
Seq.empty,
sparkSession,
carbonTable)
}
def getPartitions(partitionFilters: Seq[Expression],
sparkSession: SparkSession,
identifier: TableIdentifier): Option[Seq[PartitionSpec]] = {
val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession)
getPartitions(partitionFilters, sparkSession, carbonTable)
}
/**
* Fetches partition information from hive
* @param partitionFilters
* @param sparkSession
* @param carbonTable
* @return
*/
def getPartitions(partitionFilters: Seq[Expression],
sparkSession: SparkSession,
carbonTable: CarbonTable): Option[Seq[PartitionSpec]] = {
val identifier = TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName))
if (!carbonTable.isHivePartitionTable) {
return None
}
val partitions = {
try {
if (CarbonProperties.getInstance().
getProperty(CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT,
CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT_DEFAULT).toBoolean) {
// read partitions directly from hive metastore using filters
sparkSession.sessionState.catalog.listPartitionsByFilter(identifier, partitionFilters)
} else {
// Read partitions alternatively by first get all partitions then filter them
sparkSession.sessionState.catalog.
asInstanceOf[CarbonSessionCatalog].getPartitionsAlternate(
partitionFilters,
sparkSession,
identifier)
}
} catch {
case e: Exception =>
// Get partition information alternatively.
sparkSession.sessionState.catalog.
asInstanceOf[CarbonSessionCatalog].getPartitionsAlternate(
partitionFilters,
sparkSession,
identifier)
}
}
Some(partitions.map { partition =>
new PartitionSpec(
new util.ArrayList[String]( partition.spec.seq.map{case (column, value) =>
column + "=" + value}.toList.asJava), partition.location)
})
}
}