blob: e020a99da00f0377e5ea15133a38bb27065309fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{CodegenSupport, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.optimizer.CarbonDecoderRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
import org.apache.spark.util.SerializableConfiguration
import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.ColumnIdentifier
import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.scan.executor.util.QueryUtil
import org.apache.carbondata.core.util.{DataTypeUtil, ThreadLocalSessionInfo}
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
import org.apache.carbondata.spark.rdd.CarbonRDDWithTableInfo
/**
* It decodes the data.
*
*/
case class CarbonDictionaryDecoder(
relations: Seq[CarbonDecoderRelation],
profile: CarbonProfile,
aliasMap: CarbonAliasDecoderRelation,
child: SparkPlan,
sparkSession: SparkSession)
extends UnaryExecNode with CodegenSupport {
override val output: Seq[Attribute] =
CarbonDictionaryDecoder.convertOutput(child.output, relations, profile, aliasMap)
override def outputPartitioning: Partitioning = {
child.outputPartitioning
}
val getDictionaryColumnIds: Array[(String, ColumnIdentifier, CarbonDimension)] =
CarbonDictionaryDecoder.getDictionaryColumnMapping(child.output, relations, profile, aliasMap)
val broadcastConf = SparkSQLUtil.broadCastHadoopConf(
sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
override def doExecute(): RDD[InternalRow] = {
attachTree(this, "execute") {
val tableNameToCarbonTableMapping = relations.map { relation =>
val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
(carbonTable.getTableName, carbonTable)
}.toMap
if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {
val dataTypes = child.output.map { attr => attr.dataType }
child.execute().mapPartitions { iter =>
ThreadLocalSessionInfo.setConfigurationToCurrentThread(broadcastConf.value.value)
val cacheProvider: CacheProvider = CacheProvider.getInstance
val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
val dicts: Seq[Dictionary] = getDictionary(tableNameToCarbonTableMapping,
forwardDictionaryCache)
val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2)
// add a task completion listener to clear dictionary that is a decisive factor for
// LRU eviction policy
val dictionaryTaskCleaner = TaskContext.get
dictionaryTaskCleaner.addTaskCompletionListener(_ =>
dicts.foreach { dictionary =>
if (null != dictionary) {
dictionary.clear()
}
}
)
new Iterator[InternalRow] {
val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
var flag = true
var total = 0L
override final def hasNext: Boolean = iter.hasNext
override final def next(): InternalRow = {
val row: InternalRow = iter.next()
val data = row.toSeq(dataTypes).toArray
dictIndex.foreach { index =>
if (data(index) != null) {
data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
.getDictionaryValueForKeyInBytes(data(index).asInstanceOf[Int]),
getDictionaryColumnIds(index)._3)
}
}
unsafeProjection(new GenericInternalRow(data))
}
}
}
} else {
child.execute()
}
}
}
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
val tableNameToCarbonTableMapping = relations.map { relation =>
val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
(carbonTable.getTableName, carbonTable)
}.toMap
if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {
val cacheProvider: CacheProvider = CacheProvider.getInstance
val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
val dicts: Seq[ForwardDictionaryWrapper] = getDictionaryWrapper(tableNameToCarbonTableMapping,
forwardDictionaryCache, broadcastConf)
val exprs = child.output.map { exp =>
ExpressionCanonicalizer.execute(BindReferences.bindReference(exp, child.output))
}
ctx.currentVars = input
val dictTuple = exprs.map(e => new DictTuple(null, false))
val decodeDictionary = ctx.freshName("deDict")
ctx.addNewFunction(decodeDictionary,
s"""
|private org.apache.spark.sql.DictTuple $decodeDictionary(
| org.apache.spark.sql.ForwardDictionaryWrapper dict, int surg)
| throws java.io.IOException {
| boolean isNull = false;
| byte[] valueIntern = dict.getDictionaryValueForKeyInBytes(surg);
| if (valueIntern == null ||
| java.util.Arrays.equals(org.apache.carbondata.core.constants
| .CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, valueIntern)) {
| isNull = true;
| valueIntern = org.apache.carbondata.core.constants
| .CarbonCommonConstants.ZERO_BYTE_ARRAY;
| }
| return new org.apache.spark.sql.DictTuple(valueIntern, isNull);
|}""".stripMargin)
val decodeDecimal = ctx.freshName("deDictDec")
ctx.addNewFunction(decodeDecimal,
s"""
|private org.apache.spark.sql.DictTuple $decodeDecimal(
| org.apache.spark.sql.ForwardDictionaryWrapper dict, int surg)
| throws java.io.IOException {
| org.apache.spark.sql.DictTuple tuple = $decodeDictionary(dict, surg);
| tuple.setValue(org.apache.spark.sql.types.Decimal.apply(new java.math.BigDecimal(
| new String((byte[])tuple.getValue(),
| org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET_CLASS))));
| return tuple;
|}""".stripMargin)
val decodeInt = ctx.freshName("deDictInt")
ctx.addNewFunction(decodeInt,
s"""
|private org.apache.spark.sql.DictTuple $decodeInt(
| org.apache.spark.sql.ForwardDictionaryWrapper dict, int surg)
| throws java.io.IOException {
| org.apache.spark.sql.DictTuple tuple = $decodeDictionary(dict, surg);
| tuple.setValue(Integer.parseInt(new String((byte[])tuple.getValue(),
| org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET_CLASS)));
| return tuple;
|}""".stripMargin)
val decodeShort = ctx.freshName("deDictShort")
ctx.addNewFunction(decodeShort,
s"""
|private org.apache.spark.sql.DictTuple $decodeShort(
| org.apache.spark.sql.ForwardDictionaryWrapper dict, int surg)
| throws java.io.IOException {
| org.apache.spark.sql.DictTuple tuple = $decodeDictionary(dict, surg);
| tuple.setValue(Short.parseShort(new String((byte[])tuple.getValue(),
| org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET_CLASS)));
| return tuple;
|}""".stripMargin)
val decodeDouble = ctx.freshName("deDictDoub")
ctx.addNewFunction(decodeDouble,
s"""
|private org.apache.spark.sql.DictTuple $decodeDouble(
| org.apache.spark.sql.ForwardDictionaryWrapper dict, int surg)
| throws java.io.IOException {
| org.apache.spark.sql.DictTuple tuple = $decodeDictionary(dict, surg);
| tuple.setValue(Double.parseDouble(new String((byte[])tuple.getValue(),
| org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET_CLASS)));
| return tuple;
|}""".stripMargin)
val decodeLong = ctx.freshName("deDictLong")
ctx.addNewFunction(decodeLong,
s"""
|private org.apache.spark.sql.DictTuple $decodeLong(
| org.apache.spark.sql.ForwardDictionaryWrapper dict, int surg)
| throws java.io.IOException {
| org.apache.spark.sql.DictTuple tuple = $decodeDictionary(dict, surg);
| tuple.setValue(Long.parseLong(new String((byte[])tuple.getValue(),
| org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET_CLASS)));
| return tuple;
|}""".stripMargin)
val decodeStr = ctx.freshName("deDictStr")
ctx.addNewFunction(decodeStr,
s"""
|private org.apache.spark.sql.DictTuple $decodeStr(
| org.apache.spark.sql.ForwardDictionaryWrapper dict, int surg)
| throws java.io.IOException {
| org.apache.spark.sql.DictTuple tuple = $decodeDictionary(dict, surg);
| tuple.setValue(UTF8String.fromBytes((byte[])tuple.getValue()));
| return tuple;
|}""".stripMargin)
val decodeBool = ctx.freshName("deDictBool")
ctx.addNewFunction(decodeBool,
s"""
|private org.apache.spark.sql.DictTuple $decodeBool(
| org.apache.spark.sql.ForwardDictionaryWrapper dict, int surg)
| throws java.io.IOException {
| org.apache.spark.sql.DictTuple tuple = $decodeDictionary(dict, surg);
| tuple.setValue(Boolean.parseBoolean(new String((byte[])tuple.getValue(),
| org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET_CLASS)));
| return tuple;
|}""".stripMargin)
val resultVars = exprs.zipWithIndex.map { case (expr, index) =>
if (dicts(index) != null) {
val ev = expr.genCode(ctx)
val dictRef = ctx.addReferenceObj("df", dicts(index))
val value = ctx.freshName("v")
var code =
s"""
|${ev.code}
""".stripMargin
if (CarbonDataTypes.isDecimal(getDictionaryColumnIds(index)._3.getDataType)) {
code +=
s"""
|org.apache.spark.sql.DictTuple $value = $decodeDecimal($dictRef, ${ev.value});
""".stripMargin
ExprCode(code, s"$value.getIsNull()",
s"((org.apache.spark.sql.types.Decimal)$value.getValue())")
} else {
getDictionaryColumnIds(index)._3.getDataType match {
case CarbonDataTypes.INT => code +=
s"""
|org.apache.spark.sql.DictTuple $value = $decodeInt($dictRef, ${ ev.value });
""".stripMargin
ExprCode(code, s"$value.getIsNull()", s"((Integer)$value.getValue())")
case CarbonDataTypes.SHORT => code +=
s"""
|org.apache.spark.sql.DictTuple $value = $decodeShort($dictRef, ${ ev.value });
""".stripMargin
ExprCode(code, s"$value.getIsNull()", s"((Short)$value.getValue())")
case CarbonDataTypes.DOUBLE => code +=
s"""
|org.apache.spark.sql.DictTuple $value = $decodeDouble($dictRef, ${ ev.value });
""".stripMargin
ExprCode(code, s"$value.getIsNull()", s"((Double)$value.getValue())")
case CarbonDataTypes.LONG => code +=
s"""
|org.apache.spark.sql.DictTuple $value = $decodeLong($dictRef, ${ ev.value });
""".stripMargin
ExprCode(code, s"$value.getIsNull()", s"((Long)$value.getValue())")
case CarbonDataTypes.BOOLEAN => code +=
s"""
|org.apache.spark.sql.DictTuple $value = $decodeBool($dictRef, ${ ev.value });
""".stripMargin
ExprCode(code, s"$value.getIsNull()", s"((Boolean)$value.getValue())")
case _ => code +=
s"""
|org.apache.spark.sql.DictTuple $value = $decodeStr($dictRef, ${ev.value});
""".stripMargin
ExprCode(code, s"$value.getIsNull()", s"((UTF8String)$value.getValue())")
}
}
} else {
expr.genCode(ctx)
}
}
// Evaluation of non-deterministic expressions can't be deferred.
s"""
|${consume(ctx, resultVars)}
""".stripMargin
} else {
val exprs = child.output.map(x =>
ExpressionCanonicalizer.execute(BindReferences.bindReference(x, child.output)))
ctx.currentVars = input
val resultVars = exprs.map(_.genCode(ctx))
// Evaluation of non-deterministic expressions can't be deferred.
s"""
|${consume(ctx, resultVars)}
""".stripMargin
}
}
override def inputRDDs(): Seq[RDD[InternalRow]] = {
child.asInstanceOf[CodegenSupport].inputRDDs()
}
protected override def doProduce(ctx: CodegenContext): String = {
child.asInstanceOf[CodegenSupport].produce(ctx, this)
}
private def getDictionary(atiMap: Map[String, CarbonTable],
cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
val dicts: Seq[Dictionary] = getDictionaryColumnIds.map { f =>
if (f._2 != null) {
try {
cache.get(new DictionaryColumnUniqueIdentifier(
atiMap(f._1).getAbsoluteTableIdentifier, f._2, f._3.getDataType))
} catch {
case _: Throwable => null
}
} else {
null
}
}
dicts
}
private def getDictionaryWrapper(atiMap: Map[String, CarbonTable],
cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary],
broadcastConf: Broadcast[SerializableConfiguration]) = {
val allDictIdentifiers = new ArrayBuffer[DictionaryColumnUniqueIdentifier]()
val dicts: Seq[ForwardDictionaryWrapper] = getDictionaryColumnIds.map {
case (tableName, columnIdentifier, carbonDimension) =>
if (columnIdentifier != null) {
try {
val (newAbsoluteTableIdentifier, newColumnIdentifier) =
if (null != carbonDimension.getColumnSchema.getParentColumnTableRelations &&
!carbonDimension
.getColumnSchema.getParentColumnTableRelations.isEmpty) {
(QueryUtil
.getTableIdentifierForColumn(carbonDimension),
new ColumnIdentifier(carbonDimension.getColumnSchema
.getParentColumnTableRelations.get(0).getColumnId,
carbonDimension.getColumnProperties,
carbonDimension.getDataType))
} else {
(atiMap(tableName).getAbsoluteTableIdentifier, columnIdentifier)
}
val dictionaryColumnUniqueIdentifier = new DictionaryColumnUniqueIdentifier(
newAbsoluteTableIdentifier,
newColumnIdentifier, carbonDimension.getDataType)
allDictIdentifiers += dictionaryColumnUniqueIdentifier
new ForwardDictionaryWrapper(dictionaryColumnUniqueIdentifier, broadcastConf)
} catch {
case _: Throwable => null
}
} else {
null
}
}
val dictionaryLoader = new DictionaryLoader(allDictIdentifiers.toList)
dicts.foreach { dict =>
if (dict != null) {
dict.setDictionaryLoader(dictionaryLoader)
}
}
dicts
}
}
object CarbonDictionaryDecoder {
/**
* Converts the datatypes of attributes as per the decoder plan. If a column needs to be decoded
* here then that datatype is updated to its original datatype from Int type.
*/
def convertOutput(output: Seq[Attribute],
relations: Seq[CarbonDecoderRelation],
profile: CarbonProfile,
aliasMap: CarbonAliasDecoderRelation): Seq[Attribute] = {
output.map { a =>
val attr = aliasMap.getOrElse(a, a)
val relation = relations.find(p => p.contains(attr))
if (relation.isDefined && canBeDecoded(attr, profile)) {
val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
val carbonDimension = carbonTable.getDimensionByName(attr.name)
if (carbonDimension != null &&
carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
!carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
!carbonDimension.isComplex()) {
val newAttr = AttributeReference(a.name,
SparkTypeConverter.convertCarbonToSparkDataType(carbonDimension.getColumnSchema,
relation.get.carbonRelation.carbonTable),
a.nullable,
a.metadata)(a.exprId).asInstanceOf[Attribute]
newAttr
} else {
a
}
} else {
a
}
}
}
/**
* Updates all dictionary attributes with integer datatype.
*/
def updateAttributes(output: Seq[Attribute],
relations: Seq[CarbonDecoderRelation],
aliasMap: CarbonAliasDecoderRelation): Seq[Attribute] = {
output.map { a =>
val attr = aliasMap.getOrElse(a, a)
val relation = relations.find(p => p.contains(attr))
if (relation.isDefined) {
val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
val carbonDimension = carbonTable.getDimensionByName(attr.name)
if (carbonDimension != null &&
carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
!carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
!carbonDimension.isComplex()) {
val newAttr = AttributeReference(a.name,
IntegerType,
a.nullable,
a.metadata)(a.exprId).asInstanceOf[Attribute]
newAttr
} else {
a
}
} else {
a
}
}
}
/**
* Whether the attributed requires to decode or not based on the profile.
*/
def canBeDecoded(attr: Attribute, profile: CarbonProfile): Boolean = {
profile match {
case ip: IncludeProfile =>
ip.attributes
.exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
case ep: ExcludeProfile =>
!ep.attributes
.exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
case _ => true
}
}
def getDictionaryColumnMapping(output: Seq[Attribute],
relations: Seq[CarbonDecoderRelation],
profile: CarbonProfile,
aliasMap: CarbonAliasDecoderRelation): Array[(String, ColumnIdentifier, CarbonDimension)] = {
output.map { attribute =>
val attr = aliasMap.getOrElse(attribute, attribute)
val relation = relations.find(p => p.contains(attr))
if (relation.isDefined && CarbonDictionaryDecoder.canBeDecoded(attr, profile)) {
val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
val carbonDimension =
carbonTable.getDimensionByName(attr.name)
if (carbonDimension != null &&
carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
!carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
!carbonDimension.isComplex) {
(carbonTable.getTableName, carbonDimension.getColumnIdentifier,
carbonDimension)
} else {
(null, null, null)
}
} else {
(null, null, null)
}
}.toArray
}
def isRequiredToDecode(colIdents: Array[(String, ColumnIdentifier, CarbonDimension)]): Boolean = {
colIdents.find(p => p._1 != null) match {
case Some(_) => true
case _ => false
}
}
}
class CarbonDecoderRDD(
relations: Seq[CarbonDecoderRelation],
profile: CarbonProfile,
aliasMap: CarbonAliasDecoderRelation,
val prev: RDD[InternalRow],
output: Seq[Attribute],
serializedTableInfo: Array[Byte])
extends CarbonRDDWithTableInfo[InternalRow](relations.head.carbonRelation.sparkSession,
prev,
serializedTableInfo) {
def canBeDecoded(attr: Attribute): Boolean = {
profile match {
case ip: IncludeProfile if ip.attributes.nonEmpty =>
ip.attributes
.exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
case ep: ExcludeProfile =>
!ep.attributes
.exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
case _ => true
}
}
val getDictionaryColumnIds = {
val dictIds: Array[(String, ColumnIdentifier, CarbonDimension)] = output.map { a =>
val attr = aliasMap.getOrElse(a, a)
val relation = relations.find(p => p.contains(attr))
if (relation.isDefined && canBeDecoded(attr)) {
val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
val carbonDimension =
carbonTable.getDimensionByName(attr.name)
if (carbonDimension != null &&
carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
!carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
!carbonDimension.isComplex()) {
(carbonTable.getTableName, carbonDimension.getColumnIdentifier,
carbonDimension)
} else {
(null, null, null)
}
} else {
(null, null, null)
}
}.toArray
dictIds
}
override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val tableInfo = getTableInfo
val tableNameToCarbonTableMapping = relations.map { _ =>
(tableInfo.getFactTable.getTableName, CarbonTable.buildFromTableInfo(tableInfo))
}.toMap
val cacheProvider: CacheProvider = CacheProvider.getInstance
val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
val dicts: Seq[Dictionary] = getDictionary(tableNameToCarbonTableMapping,
forwardDictionaryCache)
val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2)
// add a task completion listener to clear dictionary that is a decisive factor for
// LRU eviction policy
val dictionaryTaskCleaner = TaskContext.get
dictionaryTaskCleaner.addTaskCompletionListener(_ =>
dicts.foreach { dictionary =>
if (null != dictionary) {
dictionary.clear()
}
}
)
val iter = firstParent[InternalRow].iterator(split, context)
new Iterator[InternalRow] {
var flag = true
var total = 0L
val dataTypes = output.map { attr => attr.dataType }
override final def hasNext: Boolean = iter.hasNext
override final def next(): InternalRow = {
val row: InternalRow = iter.next()
val data = row.toSeq(dataTypes).toArray
dictIndex.foreach { index =>
if (data(index) != null) {
data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
.getDictionaryValueForKeyInBytes(data(index).asInstanceOf[Int]),
getDictionaryColumnIds(index)._3)
}
}
new GenericInternalRow(data)
}
}
}
private def getDictionary(atiMap: Map[String, CarbonTable],
cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
val dicts: Seq[Dictionary] = getDictionaryColumnIds.map { f =>
if (f._2 != null) {
try {
cache.get(new DictionaryColumnUniqueIdentifier(
atiMap(f._1).getAbsoluteTableIdentifier, f._2, f._3.getDataType))
} catch {
case _: Throwable => null
}
} else {
null
}
}
dicts
}
override protected def internalGetPartitions: Array[Partition] =
firstParent[InternalRow].partitions
}
/**
* It is a wrapper around Dictionary, it is a work around to keep the dictionary serializable in
* case of codegen
* @param dictIdentifier Dictionary column unique identifier
* @param broadcastConf hadoop broadcast conf for serialization, that contains carbon conf.
*/
class ForwardDictionaryWrapper(
dictIdentifier: DictionaryColumnUniqueIdentifier,
broadcastConf: Broadcast[SerializableConfiguration]) extends Serializable {
var dictionary: Dictionary = null
var dictionaryLoader: DictionaryLoader = _
def getDictionaryValueForKeyInBytes (surrogateKey: Int): Array[Byte] = {
if (dictionary == null) {
// Note: from doConsume() codegen, this is the first method called.
// so setting conf to threadlocal here. If any new method added before calling this method.
// Need to set this in that method instead of here.
ThreadLocalSessionInfo.setConfigurationToCurrentThread(broadcastConf.value.value)
dictionary = dictionaryLoader.getDictionary(dictIdentifier)
}
dictionary.getDictionaryValueForKeyInBytes(surrogateKey)
}
def setDictionaryLoader(loader: DictionaryLoader): Unit = {
dictionaryLoader = loader
}
def clear(): Unit = {
if (dictionary == null) {
dictionary = dictionaryLoader.getDictionary(dictIdentifier)
}
dictionary.clear()
}
}
class DictTuple(var value: AnyRef, var isNull: Boolean) extends Serializable {
def getValue: AnyRef = value
def getIsNull: Boolean = isNull
def setValue(value: AnyRef): Unit = {
this.value = value
}
def setIsNull(isNull: Boolean): Unit = {
this.isNull = isNull
}
}
/**
* It is Dictionary Loader class to load all dictionaries at a time instead of one by one.
*/
class DictionaryLoader(allDictIdentifiers: List[DictionaryColumnUniqueIdentifier])
extends Serializable {
var isDictionaryLoaded = false
var allDicts : java.util.List[Dictionary] = _
private def loadDictionary(): Unit = {
if (!isDictionaryLoaded) {
val cacheProvider: CacheProvider = CacheProvider.getInstance
val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
allDicts = forwardDictionaryCache.getAll(allDictIdentifiers.asJava)
isDictionaryLoaded = true
val dictionaryTaskCleaner = TaskContext.get
if (dictionaryTaskCleaner != null) {
dictionaryTaskCleaner.addTaskCompletionListener(_ =>
allDicts.asScala.foreach { dictionary =>
if (null != dictionary) {
dictionary.clear()
}
}
)
}
}
}
def getDictionary(dictIdent: DictionaryColumnUniqueIdentifier): Dictionary = {
if (!isDictionaryLoaded) {
loadDictionary()
}
val findValue = allDictIdentifiers.zipWithIndex.find(p => p._1.equals(dictIdent)).get
allDicts.get(findValue._2)
}
}