blob: 9ad76a79b254cbba3643bc68653f6a35b1174322 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.carbondata.spark.util
import java.{lang, util}
import java.lang.ref.Reference
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable
import scala.util.Try
import com.univocity.parsers.common.TextParsingException
import org.apache.log4j.Logger
import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.command.{Field, UpdateTableModel}
import org.apache.spark.sql.types._
import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.common.exceptions.MetadataProcessException
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory
import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator
import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, IndexSchema}
import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, ColumnSchema}
import org.apache.carbondata.core.util.{ByteUtil, DataTypeUtil}
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
import org.apache.carbondata.streaming.parser.FieldConverter
object CarbonScalaUtil {
private val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def getString(
row: Row,
idx: Int,
carbonLoadModel: CarbonLoadModel,
serializationNullFormat: String,
complexDelimiters: util.ArrayList[String],
timeStampFormat: SimpleDateFormat,
dateFormat: SimpleDateFormat,
isVarcharType: Boolean = false,
isComplexType: Boolean = false,
level: Int = 0): String = {
try {
FieldConverter.objectToString(row.get(idx), serializationNullFormat, complexDelimiters,
timeStampFormat, dateFormat, isVarcharType, isComplexType, level,
} catch {
case e: Exception =>
if (e.getMessage.startsWith(FieldConverter.stringLengthExceedErrorMsg)) {
val msg = s"Column ${carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
.getCreateOrderColumn.get(idx).getColName} is too long," +
s" consider to use 'long_string_columns' table property."
LOGGER.error(msg, e)
throw new Exception(msg, e)
} else {
throw e
* Converts incoming value to String after converting data as per the data type.
* @param value Input value to convert
* @param dataType Datatype to convert and then convert to String
* @param timeStampFormat Timestamp format to convert in case of timestamp data types
* @param dateFormat DataFormat to convert in case of DateType datatype
* @return converted String
def convertToDateAndTimeFormats(
value: String,
dataType: DataType,
timeStampFormat: SimpleDateFormat,
dateFormat: SimpleDateFormat): String = {
val defaultValue = value != null && value.equalsIgnoreCase(hiveDefaultPartition)
try {
dataType match {
case TimestampType if timeStampFormat != null =>
if (defaultValue) {
timeStampFormat.format(new Date())
} else {
case DateType if dateFormat != null =>
if (defaultValue) {
dateFormat.format(new Date())
} else {
case _ =>
val convertedValue =
if (convertedValue == null) {
if (defaultValue) {
return dataType match {
case BooleanType => "false"
case _ => "0"
throw new MalformedCarbonCommandException(
s"Value $value with datatype $dataType on static partition is not correct")
} catch {
case e: Exception =>
throw new MalformedCarbonCommandException(
s"Value $value with datatype $dataType on static partition is not correct")
* Converts incoming value to String after converting data as per the data type.
* @param value Input value to convert
* @param dataType Datatype to convert and then convert to String
* @param timeStampFormat Timestamp format to convert in case of timestamp data types
* @param dateFormat DataFormat to convert in case of DateType datatype
* @return converted String
def convertStaticPartitionToValues(
value: String,
dataType: DataType,
timeStampFormat: SimpleDateFormat,
dateFormat: SimpleDateFormat): AnyRef = {
val defaultValue = value != null && value.equalsIgnoreCase(hiveDefaultPartition)
try {
dataType match {
case TimestampType if timeStampFormat != null =>
val formattedString =
if (defaultValue) {
timeStampFormat.format(new Date())
} else {
val convertedValue =
case DateType if dateFormat != null =>
val formattedString =
if (defaultValue) {
dateFormat.format(new Date())
} else {
val convertedValue =
val date = generateDictionaryKey(convertedValue.asInstanceOf[Long])
case BinaryType =>
// TODO: decode required ? currently it is working
case _ =>
val convertedValue =
if (convertedValue == null) {
if (defaultValue) {
dataType match {
case BooleanType =>
return false.asInstanceOf[AnyRef]
case _ =>
return 0.asInstanceOf[AnyRef]
throw new MalformedCarbonCommandException(
s"Value $value with datatype $dataType on static partition is not correct")
} catch {
case e: Exception =>
throw new MalformedCarbonCommandException(
s"Value $value with datatype $dataType on static partition is not correct")
def generateDictionaryKey(timeValue: Long): Int = {
if (timeValue < DateDirectDictionaryGenerator.MIN_VALUE ||
timeValue > DateDirectDictionaryGenerator.MAX_VALUE) {
if (LOGGER.isDebugEnabled) {
LOGGER.debug("Value for date type column is not in valid range. Value considered as null.")
return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL
Math.floor(timeValue.toDouble / DateDirectDictionaryGenerator.MILLIS_PER_DAY).toInt +
* Converts incoming value to String after converting data as per the data type.
* @param value Input value to convert
* @param column column which it value belongs to
* @return converted String
def convertToCarbonFormat(
value: String,
column: CarbonColumn): String = {
try {
column.getDataType match {
case CarbonDataTypes.TIMESTAMP =>
DateTimeUtils.timestampToString(value.toLong * 1000)
case CarbonDataTypes.DATE =>
val date = DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(
if (date == null) {
} else {
case _ => value
} catch {
case e: Exception =>
* Converts incoming value to String after converting data as per the data type.
* @param value Input value to convert
* @param column column which it value belongs to
* @return converted String
def convertStaticPartitions(
value: String,
column: ColumnSchema): String = {
try {
if (column.getDataType.equals(CarbonDataTypes.DATE)) {
if (column.getDataType.equals(CarbonDataTypes.TIMESTAMP)) {
return DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(
} else if (column.getDataType.equals(CarbonDataTypes.DATE)) {
return DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(
column.getDataType match {
case CarbonDataTypes.TIMESTAMP =>
case CarbonDataTypes.DATE =>
case _ => value
} catch {
case e: Exception =>
private val hiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__"
* Update partition values as per the right date and time format
* @return updated partition spec
def updatePartitions(partitionSpec: mutable.LinkedHashMap[String, String],
table: CarbonTable): mutable.LinkedHashMap[String, String] = { { case (col, pValue) =>
// replace special string with empty value.
val value = if (pValue == null) {
} else if (pValue.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
} else {
val carbonColumn = table.getColumnByName(col.toLowerCase)
try {
if (value.equals(hiveDefaultPartition)) {
(col.toLowerCase, value)
} else {
val convertedString = convertToCarbonFormat(value, carbonColumn)
if (convertedString == null) {
(col.toLowerCase, hiveDefaultPartition)
} else {
(col.toLowerCase, convertedString)
} catch {
case e: Exception =>
(col.toLowerCase, value)
* Update partition values as per the right date and time format
def updatePartitions(
parts: Seq[CatalogTablePartition],
table: CarbonTable): Seq[CatalogTablePartition] = { { f =>
val specLinkedMap: mutable.LinkedHashMap[String, String] = mutable.LinkedHashMap
.empty[String, String]
f.spec.foreach(fSpec => specLinkedMap.put(fSpec._1.toLowerCase, fSpec._2))
val changedSpec =
f.copy(spec = changedSpec)
}.groupBy(p => p.spec).map(f => f._2.head).toSeq // Avoid duplicates by do group by
* returns all fields except tupleId field as it is not required in the value
* @param fields
* @return
def getAllFieldsWithoutTupleIdField(fields: Array[StructField]): Seq[Column] = {
// getting all fields except tupleId field as it is not required in the value
val otherFields = fields.toSeq
.filter(field => !
.map(field => {
new Column(
* If the table is from an old store then the table parameters are in lowercase. In the current
* code we are reading the parameters as camel case.
* This method will convert all the schema parts to camel case
* @param parameters
* @return
def getDeserializedParameters(parameters: Map[String, String]): Map[String, String] = {
val keyParts = parameters.getOrElse("spark.sql.sources.options.keys.numparts", "0").toInt
if (keyParts == 0) {
} else {
val keyStr = 0 until keyParts map {
i => parameters(s"spark.sql.sources.options.keys.part.$i")
val finalProperties = scala.collection.mutable.Map.empty[String, String]
keyStr foreach {
key =>
var value = ""
for (numValues <- 0 until parameters(key.toLowerCase() + ".numparts").toInt) {
value += parameters(key.toLowerCase() + ".part" + numValues)
finalProperties.put(key, value)
// Database name would be extracted from the parameter first. There can be a scenario where
// the dbName is not written to the old schema therefore to be on a safer side we are
// extracting dbName from tableName if it exists.
val dbAndTableName = finalProperties("tableName").split(".")
if (dbAndTableName.length > 1) {
finalProperties.put("dbName", dbAndTableName(0))
finalProperties.put("tableName", dbAndTableName(1))
} else {
finalProperties.put("tableName", dbAndTableName(0))
// Overriding the 'tablePath' in case 'tablepath' already exists. This will happen when old
// table schema is updated by the new code then both `path` and `tablepath` will exist. In
// this case use tablepath
parameters.get("tablepath") match {
case Some(tablePath) => finalProperties.put("tablePath", tablePath)
case None =>
* Retrieve error message from exception
def retrieveAndLogErrorMsg(ex: Throwable, logger: Logger): (String, String) = {
var errorMessage = "DataLoad failure"
var executorMessage = ""
if (ex != null) {
ex match {
case sparkException: SparkException =>
if (sparkException.getCause.isInstanceOf[IOException]) {
if (sparkException.getCause.getCause.isInstanceOf[MetadataProcessException]) {
executorMessage = sparkException.getCause.getCause.getMessage
errorMessage = errorMessage + ": " + executorMessage
} else {
executorMessage = sparkException.getCause.getMessage
errorMessage = errorMessage + ": " + executorMessage
} else if (sparkException.getCause.isInstanceOf[DataLoadingException] ||
sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) {
executorMessage = sparkException.getCause.getMessage
errorMessage = errorMessage + ": " + executorMessage
} else if (sparkException.getCause.isInstanceOf[TextParsingException]) {
executorMessage = CarbonDataProcessorUtil
errorMessage = errorMessage + " : " + executorMessage
} else if (sparkException.getCause.isInstanceOf[SparkException]) {
val (executorMsgLocal, errorMsgLocal) =
retrieveAndLogErrorMsg(sparkException.getCause, logger)
executorMessage = executorMsgLocal
errorMessage = errorMsgLocal
case aex: AnalysisException =>
throw aex
case uoe: UnsupportedOperationException =>
executorMessage = uoe.getMessage
errorMessage = errorMessage + ":" + executorMessage
case _ =>
if (ex.getCause != null) {
executorMessage = ex.getCause.getMessage
errorMessage = errorMessage + ": " + executorMessage
(executorMessage, errorMessage)
* Update error inside update model
def updateErrorInUpdateModel(updateModel: UpdateTableModel, executorMessage: String): Unit = {
if (updateModel.executorErrors.failureCauses == FailureCauses.NONE) {
updateModel.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
if (null != executorMessage && !executorMessage.isEmpty) {
updateModel.executorErrors.errorMsg = executorMessage
} else {
updateModel.executorErrors.errorMsg = "Update failed as the data load has failed."
* Generate unique number to be used as partition number of file name
def generateUniqueNumber(taskId: Int,
segmentId: String,
partitionNumber: lang.Long): String = {
String.valueOf(Math.pow(10, 2).toInt + segmentId.toInt) +
String.valueOf(Math.pow(10, 5).toInt + taskId) +
String.valueOf(partitionNumber + Math.pow(10, 5).toInt)
* Use reflection to clean the parser objects which are set in thread local to avoid memory issue
def cleanParserThreadLocals(): Unit = {
try {
// Get a reference to the thread locals table of the current thread
val thread = Thread.currentThread
val threadLocalsField = classOf[Thread].getDeclaredField("inheritableThreadLocals")
val threadLocalTable = threadLocalsField.get(thread)
// Get a reference to the array holding the thread local variables inside the
// ThreadLocalMap of the current thread
val threadLocalMapClass = Class.forName("java.lang.ThreadLocal$ThreadLocalMap")
val tableField = threadLocalMapClass.getDeclaredField("table")
val table = tableField.get(threadLocalTable)
// The key to the ThreadLocalMap is a WeakReference object. The referent field of this object
// is a reference to the actual ThreadLocal variable
val referentField = classOf[Reference[Thread]].getDeclaredField("referent")
var i = 0
while (i < lang.reflect.Array.getLength(table)) {
// Each entry in the table array of ThreadLocalMap is an Entry object
val entry = lang.reflect.Array.get(table, i)
if (entry != null) {
// Get a reference to the thread local object and remove it from the table
val threadLocal = referentField.get(entry).asInstanceOf[ThreadLocal[_]]
if (threadLocal != null &&
threadLocal.getClass.getName.startsWith("scala.util.DynamicVariable")) {
i += 1
} catch {
case e: Exception =>
// ignore it
* Create indexSchema provider using class name
def createIndexProvider(
className: String,
sparkSession: SparkSession,
table: CarbonTable,
schema: IndexSchema): Object = {
* this method validates the local dictionary columns configurations
* @param tableProperties
* @param localDictColumns
def validateLocalDictionaryColumns(tableProperties: mutable.Map[String, String],
localDictColumns: Seq[String]): Unit = {
// check if the duplicate columns are specified in table schema
if (localDictColumns.distinct.lengthCompare(localDictColumns.size) != 0) {
val duplicateColumns = localDictColumns
val errMsg =
duplicateColumns.mkString(",") +
". Please check the DDL."
throw new MalformedCarbonCommandException(errMsg)
* this method validates the local dictionary enable property
* @param localDictionaryEnable
* @return
def validateLocalDictionaryEnable(localDictionaryEnable: String): Boolean = {
Try(localDictionaryEnable.toBoolean) match {
case scala.util.Success(value) =>
case scala.util.Failure(ex) =>
* this method validates the local dictionary threshold property
* @param localDictionaryThreshold
* @return
def validateLocalDictionaryThreshold(localDictionaryThreshold: String): Boolean = {
// if any invalid value is configured for LOCAL_DICTIONARY_THRESHOLD, then default value
// will be
// considered which is 1000
Try(localDictionaryThreshold.toInt) match {
case scala.util.Success(value) =>
if (value < CarbonCommonConstants.LOCAL_DICTIONARY_MIN ||
value > CarbonCommonConstants.LOCAL_DICTIONARY_MAX) {
} else {
case scala.util.Failure(ex) =>
* This method validate if both local dictionary include and exclude contains same column
* @param tableProperties
def validateDuplicateColumnsForLocalDict(tableProperties: mutable.Map[String, String]): Unit = {
val isLocalDictIncludeDefined = tableProperties
val isLocalDictExcludeDefined = tableProperties
if (isLocalDictIncludeDefined && isLocalDictExcludeDefined) {
val localDictIncludeCols = tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE)
val localDictExcludeCols = tableProperties(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE)
localDictIncludeCols.foreach { distCol =>
if (localDictExcludeCols.exists(x => x.equalsIgnoreCase(distCol.trim))) {
val duplicateColumns = (localDictIncludeCols ++ localDictExcludeCols)
.diff((localDictIncludeCols ++ localDictExcludeCols).distinct).distinct
val errMsg = "Column ambiguity as duplicate column(s):" +
duplicateColumns.mkString(",") +
" is present in LOCAL_DICTIONARY_INCLUDE " +
"and LOCAL_DICTIONARY_EXCLUDE. Duplicate columns are not allowed."
throw new MalformedCarbonCommandException(errMsg)
* This method validates all the child columns of complex column recursively to check whether
* any of the child column is of string dataType or not
* @param field
def validateChildColumnsRecursively(field: Field): Boolean = {
if (field.children.isDefined && null != field.children.get) {
field.children.get.exists { childColumn =>
if (childColumn.children.isDefined && null != childColumn.children.get) {
} else {
} else {
* This method validates the local dictionary configured columns
* @param fields
* @param tableProperties
def validateLocalConfiguredDictionaryColumns(fields: Seq[Field],
tableProperties: mutable.Map[String, String], localDictColumns: Seq[String]): Unit = {
// validate the local dict columns
validateLocalDictionaryColumns(tableProperties, localDictColumns)
// check if the column specified exists in table schema
localDictColumns.foreach { distCol =>
if (!fields.exists(x => x.column.equalsIgnoreCase(distCol.trim))) {
val errorMsg = "LOCAL_DICTIONARY_INCLUDE/LOCAL_DICTIONARY_EXCLUDE column: " + distCol.trim +
" does not exist in table. Please check the DDL."
throw new MalformedCarbonCommandException(errorMsg)
// check if column is other than STRING or VARCHAR datatype
localDictColumns.foreach { dictColumn =>
if (fields
.exists(x => x.column.equalsIgnoreCase(dictColumn) &&
!x.dataType.get.equalsIgnoreCase("STRING") &&
!x.dataType.get.equalsIgnoreCase("VARCHAR") &&
!x.dataType.get.equalsIgnoreCase("STRUCT") &&
!x.dataType.get.equalsIgnoreCase("MAP") &&
!x.dataType.get.equalsIgnoreCase("ARRAY"))) {
if (fields.exists(x => x.column.equalsIgnoreCase(dictColumn)
&& x.dataType.get.equalsIgnoreCase("BINARY"))
&& tableProperties.get("local_dictionary_exclude").nonEmpty
&& tableProperties.get("local_dictionary_exclude").get.contains(dictColumn)
&& (tableProperties.get("local_dictionary_include").isEmpty
|| (!tableProperties.get("local_dictionary_include").get.contains(dictColumn)))) {"Local_dictionary_exclude supports binary")
} else {
dictColumn.trim +
" is not a string/complex/varchar datatype column. LOCAL_DICTIONARY_COLUMN" +
" should be no dictionary string/complex/varchar datatype column." +
"Please check the DDL."
throw new MalformedCarbonCommandException(errorMsg)
// Validate whether any of the child columns of complex dataType column is a string column
localDictColumns.foreach { dictColumn =>
if (fields
.exists(x => x.column.equalsIgnoreCase(dictColumn) && x.children.isDefined &&
null != x.children.get && !validateChildColumnsRecursively(x))) {
val errMsg =
s"None of the child columns of complex dataType column $dictColumn specified in " +
"local_dictionary_include are not of string dataType."
throw new MalformedCarbonCommandException(errMsg)
* The method inserts the column to sort columns
* @param column Name of the column to be inserted to sort columns
* @param insertBefore Columns before which given column should be inserted
* @param tableProperties Table properties
def insertColumnToSortColumns(column: String, insertBefore: Array[String],
tableProperties: mutable.Map[String, String]): Unit = {
// Insert the column into sort columns
val sortKey = tableProperties.get(CarbonCommonConstants.SORT_COLUMNS)
var sortColumnsString = column
// If sort columns are not configured, simply use column as a sort column.
if (sortKey.isDefined && !sortKey.get.isEmpty) {
sortColumnsString = sortKey.get
val sortColumns = sortColumnsString.split(",").map(_.trim)
// If sort columns already contains column, use sort columns as is.
if (!sortColumns.contains(column)) {
// If sort columns do not contain this column as one of the sort column then check if
// any of the insertBefore columns are present as sort columns. If so, insert this column
// into sort columns such that it is just before them. Thus, sorting of data happens w.r.t
// this column before any of the insertBefore columns.
val columnsIndex = new Array[Int](insertBefore.length)
insertBefore.zipWithIndex.foreach {
case (colName, index) => columnsIndex(index) = sortColumns.indexWhere(_.equals(colName))
val posIdx = columnsIndex.filter(_ >= 0)
if (posIdx.nonEmpty) {
// Found index of first column in the sort columns. Insert this column just before it
sortColumnsString = (sortColumns.slice(0, posIdx.min) ++ Array(column) ++
sortColumns.slice(posIdx.min, sortColumns.length)).mkString(",")
} else {
// None of the insertBefore column are not present as sort columns. Just append
// column to existing sort columns.
sortColumnsString = sortColumnsString + s",$column"
tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, sortColumnsString)
def isStringDataType(dataType: DataType): Boolean = {
dataType == StringType
* Rearrange the column schema with all the sort columns at first. In case of ALTER ADD COLUMNS,
* if the newly added column is a sort column it will be at the last. But we expects all the
* SORT_COLUMNS always at first
* @param columnSchemas
* @return
def reArrangeColumnSchema(columnSchemas: mutable.Buffer[ColumnSchema]): mutable
.Buffer[ColumnSchema] = {
val newColumnSchemas = mutable.Buffer[ColumnSchema]()
newColumnSchemas ++= columnSchemas.filter(columnSchema => columnSchema.isSortColumn)
newColumnSchemas ++= columnSchemas.filterNot(columnSchema => columnSchema.isSortColumn)
def logTime[T](f: => T): (T, Long) = {
val startTime = System.currentTimeMillis()
val response = f
val endTime = System.currentTimeMillis() - startTime
(response, endTime)