blob: 15428fa186649cea1994a802869e58adb0be4b67 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.management
import java.text.SimpleDateFormat
import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.command.UpdateTableModel
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{CarbonReflectionUtils, CollectionAccumulator, SparkUtil}
import org.apache.carbondata.common.{Maps, Strings}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants, SortScopeOptions}
import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.index.{IndexStoreManager, TableIndex}
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.{DateDirectDictionaryGenerator, TimeStampGranularityTypeValue}
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.segmentmeta.{SegmentMetaDataInfo, SegmentMetaDataInfoStats}
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util._
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{BuildIndexPostExecutionEvent, BuildIndexPreExecutionEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.indexserver.DistributedRDDUtils
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.load.{CsvRDDHelper, DataLoadProcessorStepOnSpark, GlobalSortHelper}
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
import org.apache.carbondata.spark.util.CarbonScalaUtil
object CommonLoadUtils {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
def makeAuditInfo(loadResult: LoadMetadataDetails): Map[String, String] = {
if (loadResult != null) {
Map(
"SegmentId" -> loadResult.getLoadName,
"DataSize" -> Strings.formatSize(java.lang.Long.parseLong(loadResult.getDataSize)),
"IndexSize" -> Strings.formatSize(java.lang.Long.parseLong(loadResult.getIndexSize)))
} else {
Map()
}
}
def getDataFrameWithTupleID(dataFrame: Option[DataFrame]): DataFrame = {
val fields = dataFrame.get.schema.fields
import org.apache.spark.sql.functions.udf
// extracting only segment from tupleId
val getSegIdUDF = udf((tupleId: String) =>
CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
// getting all fields except tupleId field as it is not required in the value
val otherFields = CarbonScalaUtil.getAllFieldsWithoutTupleIdField(fields)
// extract tupleId field which will be used as a key
val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
val fieldWithTupleId = otherFields :+ segIdColumn
// use dataFrameWithTupleId as loadDataFrame
val dataFrameWithTupleId = dataFrame.get.select(fieldWithTupleId: _*)
(dataFrameWithTupleId)
}
def processMetadataCommon(sparkSession: SparkSession,
databaseNameOp: Option[String],
tableName: String,
tableInfoOp: Option[TableInfo],
partition: Map[String, Option[String]]): (Long, CarbonTable, String, LogicalRelation,
Map[String, Option[String]]) = {
val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
var table: CarbonTable = null
var logicalPartitionRelation: LogicalRelation = null
var sizeInBytes: Long = 0L
table = if (tableInfoOp.isDefined) {
CarbonTable.buildFromTableInfo(tableInfoOp.get)
} else {
CarbonEnv.getCarbonTable(Option(dbName), tableName)(sparkSession)
}
var finalPartition: Map[String, Option[String]] = Map.empty
if (table.isHivePartitionTable) {
logicalPartitionRelation =
new FindDataSourceTable(sparkSession).apply(
sparkSession.sessionState.catalog.lookupRelation(
TableIdentifier(tableName, databaseNameOp))).collect {
case l: LogicalRelation => l
}.head
finalPartition = getCompletePartitionValues(partition, table)
}
(sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition)
}
def prepareLoadModel(hadoopConf: Configuration,
factPath: String,
optionsFinal: util.Map[String, String],
parentTablePath: String,
table: CarbonTable,
isDataFrame: Boolean,
internalOptions: Map[String, String],
partition: Map[String, Option[String]],
options: Map[java.lang.String, String]): CarbonLoadModel = {
val carbonLoadModel = new CarbonLoadModel()
carbonLoadModel.setFactFilePath(factPath)
carbonLoadModel.setCarbonTransactionalTable(table.getTableInfo.isTransactionalTable)
carbonLoadModel.setAggLoadRequest(
internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL,
CarbonCommonConstants.IS_INTERNAL_LOAD_CALL_DEFAULT).toBoolean)
carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", ""))
val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
.getOrElse(CarbonCommonConstants.COMPRESSOR,
CompressorFactory.getInstance().getCompressor.getName)
carbonLoadModel.setColumnCompressor(columnCompressor)
carbonLoadModel.setRangePartitionColumn(table.getRangeColumn)
val javaPartition = mutable.Map[String, String]()
partition.foreach { case (k, v) =>
if (v.isEmpty) {
javaPartition(k) = null
} else {
javaPartition(k) = v.get
}
}
new CarbonLoadModelBuilder(table).build(
options.asJava,
optionsFinal,
carbonLoadModel,
hadoopConf,
javaPartition.asJava,
isDataFrame)
carbonLoadModel
}
def setNumberOfCoresWhileLoading(sparkSession: SparkSession): CarbonProperties = {
// get the value of 'spark.executor.cores' from spark conf, default value is 1
val sparkExecutorCores = sparkSession.sparkContext.conf.get("spark.executor.cores", "1")
// get the value of 'carbon.number.of.cores.while.loading' from carbon properties,
// default value is the value of 'spark.executor.cores'
val numCoresLoading =
try {
CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.NUM_CORES_LOADING, sparkExecutorCores)
} catch {
case exc: NumberFormatException =>
LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING
+ " is wrong. Falling back to the default value "
+ sparkExecutorCores)
sparkExecutorCores
}
// update the property with new value
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.NUM_CORES_LOADING, numCoresLoading)
}
def getCurrentPartitions(sparkSession: SparkSession,
table: CarbonTable): util.List[PartitionSpec] = {
val currPartitions = if (table.isHivePartitionTable) {
CarbonFilters.getCurrentPartitions(
sparkSession,
table) match {
case Some(parts) => new util.ArrayList(parts.toList.asJava)
case _ => null
}
} else {
null
}
currPartitions
}
def getFinalLoadOptions(
table: CarbonTable,
options: Map[String, String]): util.Map[String, String] = {
val tableProperties = table.getTableInfo.getFactTable.getTableProperties
val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
// For legacy store, it uses a different header option by default.
EnvHelper.setDefaultHeader(SparkSQLUtil.getSparkSession, optionsFinal)
/**
* Priority of sort_scope assignment :
* -----------------------------------
*
* 1. Load Options ->
* LOAD DATA INPATH 'data.csv' INTO TABLE tableName OPTIONS('sort_scope'='no_sort')
*
* 2. Session property CARBON_TABLE_LOAD_SORT_SCOPE ->
* SET CARBON.TABLE.LOAD.SORT.SCOPE.database.table=local_sort
*
* 3. Sort Scope provided in TBLPROPERTIES
* 4. Session property CARBON_OPTIONS_SORT_SCOPE
* 5. Default Sort Scope LOAD_SORT_SCOPE
*/
val carbonProperty = CarbonProperties.getInstance
if (table.getNumberOfSortColumns == 0) {
// If tableProperties.SORT_COLUMNS is null
optionsFinal.put(CarbonCommonConstants.SORT_SCOPE,
SortScopeOptions.SortScope.NO_SORT.name)
} else if (StringUtils.isBlank(tableProperties.get(CarbonCommonConstants.SORT_SCOPE))) {
// If tableProperties.SORT_COLUMNS is not null
// and tableProperties.SORT_SCOPE is null
optionsFinal.put(CarbonCommonConstants.SORT_SCOPE,
options.getOrElse(CarbonCommonConstants.SORT_SCOPE,
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE +
table.getDatabaseName + "." + table.getTableName,
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
SortScopeOptions.SortScope.LOCAL_SORT.name)))))
} else {
optionsFinal.put(CarbonCommonConstants.SORT_SCOPE,
options.getOrElse(CarbonCommonConstants.SORT_SCOPE,
carbonProperty.getProperty(
CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE + table.getDatabaseName + "." +
table.getTableName,
tableProperties.asScala.getOrElse(CarbonCommonConstants.SORT_SCOPE,
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))))
if (!StringUtils.isBlank(tableProperties.get("global_sort_partitions"))) {
if (options.get("global_sort_partitions").isEmpty) {
optionsFinal.put(
"global_sort_partitions",
tableProperties.get("global_sort_partitions"))
}
}
}
optionsFinal
.put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
// If DATEFORMAT is not present in load options, check from table properties.
if (optionsFinal.get("dateformat").isEmpty) {
optionsFinal.put("dateformat", Maps.getOrDefault(tableProperties,
"dateformat", CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)))
}
// If TIMESTAMPFORMAT is not present in load options, check from table properties.
if (optionsFinal.get("timestampformat").isEmpty) {
optionsFinal.put("timestampformat", Maps.getOrDefault(tableProperties,
"timestampformat", CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)))
}
optionsFinal
}
def firePreLoadEvents(
sparkSession: SparkSession,
carbonLoadModel: CarbonLoadModel,
uuid: String,
factPath: String,
optionsFinal: util.Map[String, String],
options: util.Map[String, String],
isOverwriteTable: Boolean,
isDataFrame: Boolean,
updateModel: Option[UpdateTableModel],
operationContext: OperationContext): (util.List[TableIndex], OperationContext) = {
operationContext.setProperty("uuid", uuid)
if (updateModel.isDefined && updateModel.get.isUpdate) {
operationContext.setProperty("isLoadOrCompaction", false)
}
val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
new LoadTablePreExecutionEvent(
table.getCarbonTableIdentifier,
carbonLoadModel,
factPath,
isDataFrame,
optionsFinal,
options,
isOverwriteTable)
operationContext.setProperty("isOverwrite", isOverwriteTable)
OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext)
// Add pre event listener for index indexSchema
val tableIndexes = IndexStoreManager.getInstance().getAllCGAndFGIndexes(table)
val indexOperationContext = new OperationContext()
if (tableIndexes.size() > 0) {
val indexNames: mutable.Buffer[String] =
tableIndexes.asScala.map(index => index.getIndexSchema.getIndexName)
val buildIndexPreExecutionEvent: BuildIndexPreExecutionEvent =
BuildIndexPreExecutionEvent(sparkSession,
table.getAbsoluteTableIdentifier, indexNames)
OperationListenerBus.getInstance().fireEvent(buildIndexPreExecutionEvent,
indexOperationContext)
}
(tableIndexes, indexOperationContext)
}
def firePostLoadEvents(sparkSession: SparkSession,
carbonLoadModel: CarbonLoadModel,
tableIndexes: util.List[TableIndex],
indexOperationContext: OperationContext,
table: CarbonTable,
operationContext: OperationContext): Unit = {
val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
new LoadTablePostExecutionEvent(
table.getCarbonTableIdentifier,
carbonLoadModel)
OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
if (tableIndexes.size() > 0) {
val buildIndexPostExecutionEvent = BuildIndexPostExecutionEvent(sparkSession,
table.getAbsoluteTableIdentifier, null, Seq(carbonLoadModel.getSegmentId), false)
OperationListenerBus.getInstance()
.fireEvent(buildIndexPostExecutionEvent, indexOperationContext)
}
}
/**
* Convert the rdd as per steps of data loading inputprocessor step and converter step
* @param originRDD
* @param sparkSession
* @param model
* @param isDataFrame
* @param partitionValues
* @return
*/
private def convertData(
originRDD: RDD[Row],
sparkSession: SparkSession,
model: CarbonLoadModel,
isDataFrame: Boolean,
partitionValues: Array[String]): RDD[InternalRow] = {
val sc = sparkSession.sparkContext
val info =
model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getFactTable.getPartitionInfo
info.setColumnSchemaList(new util.ArrayList[ColumnSchema](info.getColumnSchemaList))
val modelBroadcast = sc.broadcast(model)
val partialSuccessAccum = sc.longAccumulator("Partial Success Accumulator")
val inputStepRowCounter = sc.longAccumulator("Input Processor Accumulator")
// 1. Input
val convertRDD =
if (isDataFrame) {
originRDD.mapPartitions{rows =>
DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast)
}
} else {
// Append the partition columns in case of static partition scenario
val partitionLen = partitionValues.length
val len = model.getCsvHeaderColumns.length - partitionLen
originRDD.map{ row =>
val array = new Array[AnyRef](len + partitionLen)
var i = 0
while (i < len) {
array(i) = row.get(i).asInstanceOf[AnyRef]
i = i + 1
}
if (partitionLen > 0) {
System.arraycopy(partitionValues, 0, array, i, partitionLen)
}
array
}
}
val conf = SparkSQLUtil
.broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
val finalRDD = convertRDD.mapPartitionsWithIndex { case(index, rows) =>
DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
DataLoadProcessorStepOnSpark.inputAndConvertFunc(
rows,
index,
modelBroadcast,
partialSuccessAccum,
inputStepRowCounter,
keepActualData = true)
}.filter(_ != null).map(row => InternalRow.fromSeq(row.getData))
finalRDD
}
/**
* Transform the rdd to logical plan as per the sort_scope. If it is global sort scope then it
* will convert to sort logical plan otherwise project plan.
*/
def transformQueryWithRow(rdd: RDD[Row],
sparkSession: SparkSession,
loadModel: CarbonLoadModel,
partitionValues: Array[String],
catalogTable: CatalogTable,
curAttributes: Seq[AttributeReference],
sortScope: SortScopeOptions.SortScope,
isDataFrame: Boolean,
table: CarbonTable,
partition: Map[String, Option[String]]): (LogicalPlan, Int, Option[RDD[InternalRow]]) = {
// Converts the data as per the loading steps before give it to writer or sorter
val updatedRdd = convertData(
rdd,
sparkSession,
loadModel,
isDataFrame,
partitionValues)
transformQuery(updatedRdd,
sparkSession,
loadModel,
partitionValues,
catalogTable,
curAttributes,
sortScope,
isNoRearrangeFlow = false,
table,
partition)
}
def transformQuery(updatedRdd: RDD[InternalRow],
sparkSession: SparkSession,
loadModel: CarbonLoadModel,
partitionValues: Array[String],
catalogTable: CatalogTable,
curAttributes: Seq[AttributeReference],
sortScope: SortScopeOptions.SortScope,
isNoRearrangeFlow: Boolean,
table: CarbonTable,
partition: Map[String, Option[String]]): (LogicalPlan, Int, Option[RDD[InternalRow]]) = {
val catalogAttributes = catalogTable.schema.toAttributes
// Converts the data as per the loading steps before give it to writer or sorter
var attributes = curAttributes.map(a => {
catalogAttributes.find(_.name.equalsIgnoreCase(a.name)).get
})
attributes = attributes.map { attr =>
// Update attribute data types in case of dictionary columns, in case of dictionary columns
// datatype is always int
val column = table.getColumnByName(attr.name)
val updatedDataType = if (column.getDataType ==
org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) {
IntegerType
} else {
if (isNoRearrangeFlow) {
attr.dataType match {
case TimestampType | DateType =>
LongType
// complex type will be converted in CarbonOutputWriter.writeCarbon
case _ =>
attr.dataType
}
} else {
attr.dataType match {
case TimestampType | DateType =>
LongType
case _: StructType | _: ArrayType | _: MapType =>
BinaryType
case _ =>
attr.dataType
}
}
}
CarbonToSparkAdapter.createAttributeReference(attr.name,
updatedDataType,
attr.nullable,
attr.metadata,
attr.exprId,
attr.qualifier)
}
// Only select the required columns
var output = if (partition.nonEmpty) {
val lowerCasePartition = partition.map { case (key, value) => (key.toLowerCase, value) }
catalogTable.schema.map { attr =>
attributes.find(_.name.equalsIgnoreCase(attr.name)).get
}.filter(attr => lowerCasePartition.getOrElse(attr.name.toLowerCase, None).isEmpty)
} else {
catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get)
}
// Rearrange the partition column at the end of output list
if (catalogTable.partitionColumnNames.nonEmpty &&
(loadModel.getCarbonDataLoadSchema.getCarbonTable.isMV) && output.nonEmpty) {
val partitionOutPut =
catalogTable.partitionColumnNames.map(col => output.find(_.name.equalsIgnoreCase(col)).get)
output = output.filterNot(partitionOutPut.contains(_)) ++ partitionOutPut
}
val partitionsLen = updatedRdd.partitions.length
// If it is global sort scope then apply sort logical plan on the sort columns
if (sortScope == SortScopeOptions.SortScope.GLOBAL_SORT) {
// Because if the number of partitions greater than 1, there will be action operator(sample)
// in sortBy operator. So here we cache the rdd to avoid do input and convert again.
if (partitionsLen > 1) {
updatedRdd.persist(StorageLevel.fromString(
CarbonProperties.getInstance().getGlobalSortRddStorageLevel))
}
var numPartitions =
CarbonDataProcessorUtil.getGlobalSortPartitions(loadModel.getGlobalSortPartitions)
if (numPartitions <= 0) {
numPartitions = partitionsLen
}
val sortColumns =
if (table.isHivePartitionTable) {
// In case of partition column as sort column, attribute will not have it in the front.
// so need to look up the attribute and prepare
val sortColsAttr: collection.mutable.ArrayBuffer[AttributeReference] = ArrayBuffer()
val sortCols = table.getSortColumns.asScala
for (sortColumn <- sortCols) {
sortColsAttr += attributes.find(x => x.name.equalsIgnoreCase(sortColumn)).get
}
sortColsAttr
} else {
attributes.take(table.getSortColumns.size())
}
val attributesWithIndex = attributes.zipWithIndex
val dataTypes = sortColumns.map { column =>
val attributeWithIndex =
attributesWithIndex.find(x => x._1.name.equalsIgnoreCase(column.name))
(column.dataType, attributeWithIndex.get._2)
}
val sortedRDD: RDD[InternalRow] =
GlobalSortHelper.sortBy(updatedRdd, numPartitions, dataTypes)
val outputOrdering = sortColumns.map(SortOrder(_, Ascending))
(
Project(
output,
LogicalRDD(attributes, sortedRDD, outputOrdering = outputOrdering)(sparkSession)
),
partitionsLen,
Some(updatedRdd)
)
} else {
(
Project(output, LogicalRDD(attributes, updatedRdd)(sparkSession)),
partitionsLen,
None
)
}
}
/**
* Transform the rdd to logical plan as per the sort_scope. If it is global sort scope then it
* will convert to sort logical plan otherwise project plan.
*/
def transformQueryWithInternalRow(rdd: RDD[InternalRow],
sparkSession: SparkSession,
loadModel: CarbonLoadModel,
partitionValues: Array[String],
catalogTable: CatalogTable,
curAttributes: Seq[AttributeReference],
sortScope: SortScopeOptions.SortScope,
table: CarbonTable,
partition: Map[String, Option[String]]): (LogicalPlan, Int, Option[RDD[InternalRow]]) = {
// keep partition column to end if exists
var colSchema = table.getTableInfo
.getFactTable
.getListOfColumns
.asScala
if (table.getPartitionInfo != null) {
colSchema = colSchema.filterNot(x => x.isInvisible || x.isComplexColumn ||
x.getSchemaOrdinal == -1 ||
table.getPartitionInfo.getColumnSchemaList.contains(x))
colSchema = colSchema ++ table
.getPartitionInfo
.getColumnSchemaList.toArray(new Array[ColumnSchema](table
.getPartitionInfo
.getColumnSchemaList.size()))
.asInstanceOf[Array[ColumnSchema]]
} else {
colSchema = colSchema.filterNot(x => x.isInvisible || x.isComplexColumn ||
x.getSchemaOrdinal == -1)
}
val updatedRdd: RDD[InternalRow] = CommonLoadUtils.getConvertedInternalRow(
colSchema,
rdd,
sortScope == SortScopeOptions.SortScope.GLOBAL_SORT)
transformQuery(updatedRdd,
sparkSession,
loadModel,
partitionValues,
catalogTable,
curAttributes,
sortScope,
isNoRearrangeFlow = true,
table,
partition)
}
/**
* Create the logical plan for update scenario. Here we should drop the segmentId column from the
* input rdd.
*/
def getLogicalQueryForUpdate(
sparkSession: SparkSession,
catalogTable: CatalogTable,
df: DataFrame,
carbonLoadModel: CarbonLoadModel): LogicalPlan = {
SparkUtil.setNullExecutionId(sparkSession)
// In case of update, we don't need the segmentId column in case of partitioning
val dropAttributes = df.logicalPlan.output.dropRight(1)
val finalOutput = catalogTable.schema.map { attr =>
dropAttributes.find { d =>
val index = d.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)
if (index > 0) {
d.name.substring(0, index).equalsIgnoreCase(attr.name)
} else {
d.name.equalsIgnoreCase(attr.name)
}
}.get
}
carbonLoadModel.setCsvHeader(catalogTable.schema.map(_.name.toLowerCase).mkString(","))
carbonLoadModel.setCsvHeaderColumns(carbonLoadModel.getCsvHeader.split(","))
Project(finalOutput, df.logicalPlan)
}
def convertToLogicalRelation(
catalogTable: CatalogTable,
sizeInBytes: Long,
overWrite: Boolean,
loadModel: CarbonLoadModel,
sparkSession: SparkSession,
operationContext: OperationContext,
partition: Map[String, Option[String]],
updateModel: Option[UpdateTableModel],
optionsOriginal: mutable.Map[String, String],
currPartitions: util.List[PartitionSpec]): LogicalRelation = {
val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
val metastoreSchema =
if (optionsOriginal.contains(DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS)) {
StructType(catalogTable.schema.fields.map{f =>
val column = table.getColumnByName(f.name)
val updatedDataType = if (column.getDataType ==
org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) {
IntegerType
} else {
f.dataType match {
case TimestampType | DateType =>
LongType
// complex type will be converted in CarbonOutputWriter.writeCarbon
case _ =>
f.dataType
}
}
f.copy(dataType = updatedDataType)
})
} else {
StructType(catalogTable.schema.fields.map{f =>
val column = table.getColumnByName(f.name)
val updatedDataType = if (column.getDataType ==
org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) {
IntegerType
} else {
f.dataType match {
case TimestampType | DateType =>
LongType
case _: StructType | _: ArrayType | _: MapType =>
BinaryType
case _ =>
f.dataType
}
}
f.copy(dataType = updatedDataType)
})
}
val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
val catalog = new CatalogFileIndex(
sparkSession, catalogTable, sizeInBytes)
if (!lazyPruningEnabled) {
catalog.filterPartitions(Nil) // materialize all the partitions in memory
}
var partitionSchema =
StructType(table.getPartitionInfo.getColumnSchemaList.asScala.map(field =>
metastoreSchema.fields.find(_.name.equalsIgnoreCase(field.getColumnName))).map(_.get))
val dataSchema =
StructType(metastoreSchema
.filterNot(field => partitionSchema.contains(field)))
if (partition.nonEmpty) {
partitionSchema = StructType(partitionSchema.fields.map(_.copy(dataType = StringType)))
}
val options = new mutable.HashMap[String, String]()
options ++= catalogTable.storage.properties
options += (("overwrite", overWrite.toString))
if (partition.nonEmpty) {
val staticPartitionStr = ObjectSerializationUtil.convertObjectToString(
new util.HashMap[String, Boolean](
partition.map { case (col, value) => (col.toLowerCase, value.isDefined) }.asJava))
options += (("staticpartition", staticPartitionStr))
}
options ++= optionsOriginal
if (updateModel.isDefined) {
options += (("updatetimestamp", updateModel.get.updatedTimeStamp.toString))
if (updateModel.get.deletedSegments.nonEmpty) {
options += (("segmentsToBeDeleted",
updateModel.get.deletedSegments.map(_.getSegmentNo).mkString(",")))
}
}
if (currPartitions != null) {
val currPartStr = ObjectSerializationUtil.convertObjectToString(currPartitions)
options += (("currentpartition", currPartStr))
}
if (loadModel.getSegmentId != null) {
val currLoadEntry =
ObjectSerializationUtil.convertObjectToString(loadModel.getCurrentLoadMetadataDetail)
options += (("currentloadentry", currLoadEntry))
}
val hdfsRelation = HadoopFsRelation(
location = catalog,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
bucketSpec = catalogTable.bucketSpec,
fileFormat = new SparkCarbonTableFormat,
options = options.toMap)(sparkSession = sparkSession)
if (options.contains(DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS)) {
CarbonReflectionUtils.getLogicalRelation(hdfsRelation,
metastoreSchema.toAttributes,
Some(catalogTable),
false)
} else {
CarbonReflectionUtils.getLogicalRelation(hdfsRelation,
hdfsRelation.schema.toAttributes,
Some(catalogTable),
false)
}
}
def getConvertedInternalRow(
columnSchema: Seq[ColumnSchema],
rdd: RDD[InternalRow],
isGlobalSortPartition: Boolean): RDD[InternalRow] = {
// Converts the data as per the loading steps before give it to writer or sorter
var timeStampIndex = scala.collection.mutable.Set[Int]()
var dateIndex = scala.collection.mutable.Set[Int]()
var doubleIndex = scala.collection.mutable.Set[Int]()
var i: Int = 0
for (col <- columnSchema) {
if (col.getDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) {
timeStampIndex += i
} else if (col.getDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) {
dateIndex += i;
} else if (col.getDataType ==
org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE) {
doubleIndex += i
}
i = i + 1
}
val updatedRdd: RDD[InternalRow] = rdd.map { internalRowOriginal =>
val internalRow = if (isGlobalSortPartition) {
// Insert stage command & global sort partition flow (where we persist rdd[internalRow]),
// logical plan already consist of LogicalRDD of internalRow.
// When it is converted to DataFrame, spark is reusing the same internalRow.
// So, need to have a copy before the last transformation.
// TODO: Even though copying internalRow is faster, we should avoid it
// by finding a better way
internalRowOriginal.copy()
} else {
internalRowOriginal
}
for (index <- timeStampIndex) {
if (internalRow.getLong(index) == 0) {
internalRow.setNullAt(index)
} else {
internalRow.setLong(
index,
internalRow.getLong(index) / TimeStampGranularityTypeValue.MILLIS_SECONDS.getValue)
}
}
var doubleValue: Double = 0
for (index <- doubleIndex) {
doubleValue = internalRow.getDouble(index)
if (doubleValue.isNaN || doubleValue.isInfinite) {
// converter used to set null for NAN or infinite
internalRow.setNullAt(index)
}
}
for (index <- dateIndex) {
if (internalRow.isNullAt(index)) {
// null values must be replaced with direct dictionary null values
internalRow.setInt(index, CarbonCommonConstants.DIRECT_DICT_VALUE_NULL)
} else {
internalRow.setInt(index,
internalRow.getInt(index) + DateDirectDictionaryGenerator.cutOffDate)
}
}
internalRow
}
updatedRdd
}
def getTimeAndDateFormatFromLoadModel(loadModel: CarbonLoadModel): (SimpleDateFormat,
SimpleDateFormat) = {
var timestampFormatString = loadModel.getTimestampFormat
if (timestampFormatString.isEmpty) {
timestampFormatString = loadModel.getDefaultTimestampFormat
}
val timeStampFormat = new SimpleDateFormat(timestampFormatString)
var dateFormatString = loadModel.getDateFormat
if (dateFormatString.isEmpty) {
dateFormatString = loadModel.getDefaultDateFormat
}
val dateFormat = new SimpleDateFormat(dateFormatString)
(timeStampFormat, dateFormat)
}
def getCompletePartitionValues(partition: Map[String, Option[String]],
table: CarbonTable): Map[String, Option[String]] = {
if (partition.nonEmpty) {
val lowerCasePartitionMap = partition.map { entry =>
(entry._1.toLowerCase, entry._2)
}
val partitionsColumns = table.getPartitionInfo.getColumnSchemaList
if (partition.size != partitionsColumns.size()) {
val dynamicPartitions = {
partitionsColumns
.asScala
.filter { column =>
!lowerCasePartitionMap.contains(column.getColumnName)
}
.map { column =>
(column.getColumnName, None)
}
}
partition ++ dynamicPartitions
} else {
partition
}
} else {
partition
}
}
/**
* Loads the data in a hive partition way. This method uses InsertIntoTable command to load data
* into partitioned data. The table relation would be converted to HadoopFSRelation to let spark
* handling the partitioning.
*/
def loadDataWithPartition(loadParams: CarbonLoadParams): Seq[Row] = {
val table = loadParams.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val catalogTable: CatalogTable = loadParams.logicalPartitionRelation.catalogTable.get
CarbonUtils.threadSet("partition.operationcontext", loadParams.operationContext)
val attributes = if (loadParams.scanResultRDD.isDefined) {
// take the already re-arranged attributes
catalogTable.schema.toAttributes
} else {
// input data from csv files. Convert to logical plan
val allCols = new ArrayBuffer[String]()
// get only the visible dimensions from table
allCols ++= table.getVisibleDimensions.asScala.map(_.getColName)
allCols ++= table.getVisibleMeasures.asScala.map(_.getColName)
StructType(
allCols.filterNot(_.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)).map(
StructField(_, StringType))).toAttributes
}
var partitionsLen = 0
val sortScope = CarbonDataProcessorUtil.getSortScope(loadParams.carbonLoadModel.getSortScope)
val partitionValues = if (loadParams.finalPartition.nonEmpty) {
loadParams.finalPartition.filter(_._2.nonEmpty).map { case (col, value) =>
catalogTable.schema.find(_.name.equalsIgnoreCase(col)) match {
case Some(c) =>
CarbonScalaUtil.convertToDateAndTimeFormats(
value.get,
c.dataType,
loadParams.timeStampFormat,
loadParams.dateFormat)
case None =>
throw new AnalysisException(s"$col is not a valid partition column in table ${
loadParams.carbonLoadModel
.getDatabaseName
}.${ loadParams.carbonLoadModel.getTableName }")
}
}.toArray
} else {
Array[String]()
}
var persistedRDD: Option[RDD[InternalRow]] = None
try {
val query: LogicalPlan = if ((loadParams.dataFrame.isDefined) ||
loadParams.scanResultRDD.isDefined) {
val (rdd, dfAttributes) =
if (loadParams.updateModel.isDefined && !loadParams.updateModel.get.loadAsNewSegment) {
// Get the updated query plan in case of update scenario
val updatedFrame = Dataset.ofRows(
loadParams.sparkSession,
getLogicalQueryForUpdate(
loadParams.sparkSession,
catalogTable,
loadParams.dataFrame.get,
loadParams.carbonLoadModel))
(updatedFrame.rdd, updatedFrame.schema)
} else {
if (loadParams.finalPartition.nonEmpty) {
val headers = loadParams.carbonLoadModel
.getCsvHeaderColumns
.dropRight(loadParams.finalPartition.size)
val updatedHeader = headers ++ loadParams.finalPartition.keys.map(_.toLowerCase)
loadParams.carbonLoadModel.setCsvHeader(updatedHeader.mkString(","))
loadParams.carbonLoadModel
.setCsvHeaderColumns(loadParams.carbonLoadModel.getCsvHeader.split(","))
}
if (loadParams.dataFrame.isDefined) {
(loadParams.dataFrame.get.rdd, loadParams.dataFrame.get.schema)
} else {
(null, null)
}
}
if (loadParams.dataFrame.isDefined) {
val expectedColumns = {
val staticPartCols = loadParams.finalPartition.filter(_._2.isDefined).keySet
.map(columnName => columnName.toLowerCase())
attributes.filterNot(a => staticPartCols.contains(a.name.toLowerCase))
}
if (expectedColumns.length != dfAttributes.length) {
throw new AnalysisException(
s"Cannot insert into table $loadParams.tableName because the number of columns are " +
s"different: " +
s"need ${ expectedColumns.length } columns, " +
s"but query has ${ dfAttributes.length } columns.")
}
val nonPartitionBounds = expectedColumns.zipWithIndex.map(_._2).toArray
val partitionBounds = new Array[Int](partitionValues.length)
if (loadParams.finalPartition.nonEmpty) {
val nonPartitionSchemaLen = attributes.length - loadParams.finalPartition.size
var i = nonPartitionSchemaLen
var index = 0
var partIndex = 0
loadParams.finalPartition.values.foreach { p =>
if (p.isDefined) {
partitionBounds(partIndex) = nonPartitionSchemaLen + index
partIndex = partIndex + 1
} else {
nonPartitionBounds(i) = nonPartitionSchemaLen + index
i = i + 1
}
index = index + 1
}
}
val len = dfAttributes.length + partitionValues.length
val transRdd = rdd.map { f =>
val data = new Array[Any](len)
var i = 0
val length = f.length
while (i < length) {
data(nonPartitionBounds(i)) = f.get(i)
i = i + 1
}
var j = 0
val boundLength = partitionBounds.length
while (j < boundLength) {
data(partitionBounds(j)) = UTF8String.fromString(partitionValues(j))
j = j + 1
}
Row.fromSeq(data)
}
val (transformedPlan, partitions, persistedRDDLocal) =
transformQueryWithRow(
transRdd,
loadParams.sparkSession,
loadParams.carbonLoadModel,
partitionValues,
catalogTable,
attributes,
sortScope,
isDataFrame = true, table, loadParams.finalPartition)
partitionsLen = partitions
persistedRDD = persistedRDDLocal
transformedPlan
} else {
val (transformedPlan, partitions, persistedRDDLocal) =
CommonLoadUtils.transformQueryWithInternalRow(
loadParams.scanResultRDD.get,
loadParams.sparkSession,
loadParams.carbonLoadModel,
partitionValues,
catalogTable,
attributes,
sortScope,
table,
loadParams.finalPartition)
partitionsLen = partitions
persistedRDD = persistedRDDLocal
transformedPlan
}
} else {
val columnCount = loadParams.carbonLoadModel.getCsvHeaderColumns.length
val rdd = CsvRDDHelper.csvFileScanRDD(
loadParams.sparkSession,
model = loadParams.carbonLoadModel,
loadParams.hadoopConf).map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
val (transformedPlan, partitions, persistedRDDLocal) =
transformQueryWithRow(
rdd.asInstanceOf[RDD[Row]],
loadParams.sparkSession,
loadParams.carbonLoadModel,
partitionValues,
catalogTable,
attributes,
sortScope,
isDataFrame = false,
table,
loadParams.finalPartition)
partitionsLen = partitions
persistedRDD = persistedRDDLocal
transformedPlan
}
if (loadParams.updateModel.isDefined) {
loadParams.carbonLoadModel.setFactTimeStamp(loadParams.updateModel.get.updatedTimeStamp)
} else if (loadParams.carbonLoadModel.getFactTimeStamp == 0L) {
loadParams.carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
}
val opt = collection.mutable.Map() ++ loadParams.optionsOriginal
if (loadParams.scanResultRDD.isDefined) {
opt += ((DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS, "true"))
}
// Create and ddd the segment to the tablestatus.
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadParams.carbonLoadModel,
loadParams.isOverwriteTable)
val convertRelation = convertToLogicalRelation(
catalogTable,
loadParams.sizeInBytes,
loadParams.isOverwriteTable,
loadParams.carbonLoadModel,
loadParams.sparkSession,
loadParams.operationContext,
loadParams.finalPartition,
loadParams.updateModel,
opt,
loadParams.currPartitions)
val convertedPlan =
CarbonReflectionUtils.getInsertIntoCommand(
table = convertRelation,
partition = loadParams.finalPartition,
query = query,
overwrite = false,
ifPartitionNotExists = false)
SparkUtil.setNullExecutionId(loadParams.sparkSession)
Dataset.ofRows(loadParams.sparkSession, convertedPlan).collect()
} catch {
case ex: Throwable =>
val (executorMessage, errorMessage) = CarbonScalaUtil.retrieveAndLogErrorMsg(ex, LOGGER)
if (loadParams.updateModel.isDefined) {
CarbonScalaUtil.updateErrorInUpdateModel(loadParams.updateModel.get, executorMessage)
}
LOGGER.info(errorMessage)
LOGGER.error(ex)
throw ex
} finally {
CarbonUtils.threadUnset("partition.operationcontext")
if (loadParams.isOverwriteTable) {
IndexStoreManager.getInstance().clearIndex(table.getAbsoluteTableIdentifier)
// Clean the overwriting segments if any.
SegmentFileStore.cleanSegments(
table,
null,
false)
}
if (partitionsLen > 1) {
// clean cache only if persisted and keeping unpersist non-blocking as non-blocking call
// will not have any functional impact as spark automatically monitors the cache usage on
// each node and drops out old data partitions in a least-recently used (LRU) fashion.
persistedRDD match {
case Some(rdd) => rdd.unpersist(false)
case _ =>
}
}
// Pre-priming for Partition table here
if (!StringUtils.isEmpty(loadParams.carbonLoadModel.getSegmentId)) {
DistributedRDDUtils.triggerPrepriming(loadParams.sparkSession,
table,
Seq(),
loadParams.operationContext,
loadParams.hadoopConf,
List(loadParams.carbonLoadModel.getSegmentId))
}
}
try {
val compactedSegments = new util.ArrayList[String]()
// Trigger auto compaction
CarbonDataRDDFactory.handleSegmentMerging(
loadParams.sparkSession.sqlContext,
loadParams.carbonLoadModel
.getCopyWithPartition(loadParams.carbonLoadModel.getCsvHeader,
loadParams.carbonLoadModel.getCsvDelimiter),
table,
compactedSegments,
loadParams.operationContext)
loadParams.carbonLoadModel.setMergedSegmentIds(compactedSegments)
} catch {
case e: Exception =>
LOGGER.error(
"Auto-Compaction has failed. Ignoring this exception because the " +
"load is passed.", e)
}
val specs =
SegmentFileStore.getPartitionSpecs(loadParams.carbonLoadModel.getSegmentId,
loadParams.carbonLoadModel.getTablePath,
loadParams.carbonLoadModel.getLoadMetadataDetails.asScala.toArray)
if (specs != null) {
specs.asScala.map { spec =>
Row(spec.getPartitions.asScala.mkString("/"), spec.getLocation.toString, spec.getUuid)
}
} else {
Seq.empty[Row]
}
}
/**
* Fill segment level metadata to accumulator based on tableName and segmentId
*/
def fillSegmentMetaDataInfoToAccumulator(
tableName: String,
segmentId: String,
segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]])
: CollectionAccumulator[Map[String, SegmentMetaDataInfo]] = {
synchronized {
val segmentMetaDataInfo = SegmentMetaDataInfoStats.getInstance()
.getTableSegmentMetaDataInfo(tableName, segmentId)
if (null != segmentMetaDataInfo) {
segmentMetaDataAccumulator.add(scala.Predef
.Map(segmentId -> segmentMetaDataInfo))
SegmentMetaDataInfoStats.getInstance().clear(tableName, segmentId)
}
segmentMetaDataAccumulator
}
}
/**
* Collect segmentMetaDataInfo from all tasks and compare min-max values and prepare final
* segmentMetaDataInfo
*
* @param segmentId collect the segmentMetaDataInfo for the corresponding segmentId
* @param metaDataAccumulator segmentMetaDataAccumulator
* @return segmentMetaDataInfo
*/
def getSegmentMetaDataInfoFromAccumulator(
segmentId: String,
metaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]])
: SegmentMetaDataInfo = {
var segmentMetaDataInfo: SegmentMetaDataInfo = null
if (!metaDataAccumulator.isZero) {
val segmentMetaData = metaDataAccumulator.value.asScala
segmentMetaData.foreach { segmentColumnMetaDataInfo =>
val currentValue = segmentColumnMetaDataInfo.get(segmentId)
if (currentValue.isDefined) {
if (null == segmentMetaDataInfo) {
segmentMetaDataInfo = currentValue.get
} else if (segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap.isEmpty) {
segmentMetaDataInfo = currentValue.get
} else {
val iterator = currentValue.get.getSegmentColumnMetaDataInfoMap
.entrySet()
.iterator()
while (iterator.hasNext) {
val currentSegmentColumnMetaIndex = iterator.next()
if (segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap
.containsKey(currentSegmentColumnMetaIndex.getKey)) {
val currentMax = SegmentMetaDataInfoStats.getInstance()
.compareAndUpdateMinMax(segmentMetaDataInfo
.getSegmentColumnMetaDataInfoMap
.get(currentSegmentColumnMetaIndex.getKey)
.getColumnMaxValue,
currentSegmentColumnMetaIndex.getValue.getColumnMaxValue,
false)
val currentMin = SegmentMetaDataInfoStats.getInstance()
.compareAndUpdateMinMax(segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap
.get(currentSegmentColumnMetaIndex.getKey).getColumnMinValue,
currentSegmentColumnMetaIndex.getValue.getColumnMinValue,
true)
segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap
.get(currentSegmentColumnMetaIndex.getKey)
.setColumnMaxValue(currentMax)
segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap
.get(currentSegmentColumnMetaIndex.getKey)
.setColumnMinValue(currentMin)
}
}
}
}
}
}
segmentMetaDataInfo
}
}