blob: 46d88ceeed206c5b32000f2c97d71dbb3a647b5f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import java.util.{ArrayList, List}
import scala.collection.JavaConverters._
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression, GenericInternalRow}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.scan.expression.{ColumnExpression, Expression, ExpressionResult, UnknownExpression}
import org.apache.carbondata.core.scan.expression.conditional.ConditionalExpression
import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException
import org.apache.carbondata.core.scan.filter.intf.{ExpressionType, RowIntf}
class SparkUnknownExpression(
var sparkExp: SparkExpression,
expressionType: ExpressionType = ExpressionType.UNKNOWN)
extends UnknownExpression with ConditionalExpression {
private var evaluateExpression: (InternalRow) => Any = sparkExp.eval
private var isExecutor: Boolean = false
children.addAll(getColumnList())
override def evaluate(carbonRowInstance: RowIntf): ExpressionResult = {
val values = carbonRowInstance.getValues.toSeq.map {
case s: String => org.apache.spark.unsafe.types.UTF8String.fromString(s)
case d: java.math.BigDecimal => org.apache.spark.sql.types.Decimal.apply(d)
case b: Array[Byte] => org.apache.spark.unsafe.types.UTF8String.fromBytes(b)
case value => value
}
try {
val result = evaluateExpression(
new GenericInternalRow(values.map(a => a.asInstanceOf[Any]).toArray))
val sparkRes = if (isExecutor) {
result.asInstanceOf[InternalRow].get(0, sparkExp.dataType)
} else {
result
}
new ExpressionResult(
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(sparkExp.dataType),
sparkRes
)
} catch {
case e: Exception => throw new FilterUnsupportedException(e.getMessage)
}
}
override def getFilterExpressionType: ExpressionType = {
expressionType
}
override def getString: String = {
sparkExp.toString()
}
override def getStatement: String = {
sparkExp.toString()
}
def setEvaluateExpression(evaluateExpression: (InternalRow) => Any): Unit = {
this.evaluateExpression = evaluateExpression
isExecutor = true
}
override def findAndSetChild(oldExpr: Expression, newExpr: Expression): Unit = {}
def getColumnList: java.util.List[ColumnExpression] = {
val lst = new java.util.ArrayList[ColumnExpression]()
getColumnListFromExpressionTree(sparkExp, lst)
lst
}
def getLiterals: java.util.List[ExpressionResult] = {
val lst = new java.util.ArrayList[ExpressionResult]()
lst
}
def getAllColumnList: java.util.List[ColumnExpression] = {
val lst = new java.util.ArrayList[ColumnExpression]()
getAllColumnListFromExpressionTree(sparkExp, lst)
lst
}
def isSingleColumn: Boolean = {
val lst = new java.util.ArrayList[ColumnExpression]()
getAllColumnListFromExpressionTree(sparkExp, lst)
if (lst.size == 1 && lst.get(0).isDimension) {
true
} else {
false
}
}
def getColumnListFromExpressionTree(sparkCurrentExp: SparkExpression,
lst: java.util.List[ColumnExpression]): Unit = {
sparkCurrentExp match {
case carbonBoundRef: CarbonBoundReference =>
val foundExp = lst.asScala
.find(p => p.getColumnName() == carbonBoundRef.colExp.getColumnName())
if (foundExp.isEmpty) {
carbonBoundRef.colExp.setColIndex(lst.size)
lst.add(carbonBoundRef.colExp)
} else {
carbonBoundRef.colExp.setColIndex(foundExp.get.getColIndex())
}
case _ => sparkCurrentExp.children.foreach(getColumnListFromExpressionTree(_, lst))
}
}
def getAllColumnListFromExpressionTree(sparkCurrentExp: SparkExpression,
list: List[ColumnExpression]): List[ColumnExpression] = {
sparkCurrentExp match {
case carbonBoundRef: CarbonBoundReference => list.add(carbonBoundRef.colExp)
case _ => sparkCurrentExp.children.foreach(getColumnListFromExpressionTree(_, list))
}
list
}
def isDirectDictionaryColumns: Boolean = {
val lst = new ArrayList[ColumnExpression]()
getAllColumnListFromExpressionTree(sparkExp, lst)
if (lst.get(0).getCarbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
true
} else {
false
}
}
}