blob: 5e33ea7c33c312c59eb75e31f247e28b12f22043 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.rdd
import java.io.IOException
import java.util
import java.util.{Collections, List, Map}
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.reflect.classTag
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{InputSplit, Job}
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo}
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.util.{CarbonException, SparkTypeConverter}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.{CarbonCommonConstants, SortScopeOptions}
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.block._
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension, ColumnSchema}
import org.apache.carbondata.core.mutate.UpdateVO
import org.apache.carbondata.core.scan.expression
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatusManager, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection}
import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo}
import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger._
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.MergeResult
import org.apache.carbondata.spark.load.{ByteArrayOrdering, DataLoadProcessBuilderOnSpark, PrimtiveOrdering, StringOrdering}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
class CarbonMergerRDD[K, V](
@transient private val ss: SparkSession,
result: MergeResult[K, V],
carbonLoadModel: CarbonLoadModel,
carbonMergerMapping: CarbonMergerMapping)
extends CarbonRDD[(K, V)](ss, Nil) {
ss.sparkContext.setLocalProperty("spark.scheduler.pool", "DDL")
ss.sparkContext.setLocalProperty("spark.job.interruptOnCancel", "true")
private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
var storeLocation: String = null
var mergeResult: String = null
val tablePath = carbonMergerMapping.hdfsStoreLocation
val metadataFilePath = carbonMergerMapping.metadataFilePath
val mergedLoadName = carbonMergerMapping.mergedLoadName
val databaseName = carbonMergerMapping.databaseName
val factTableName = carbonMergerMapping.factTableName
val tableId = carbonMergerMapping.tableId
var rangeColumn: CarbonColumn = null
var singleRange = false
var expressionMapForRangeCol: util.Map[Integer, Expression] = null
var broadCastSplits: Broadcast[util.List[CarbonInputSplit]] = null
def makeBroadCast(splits: util.List[CarbonInputSplit]): Unit = {
broadCastSplits = sparkContext.broadcast(splits)
}
override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
val queryStartTime = System.currentTimeMillis()
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val iter = new Iterator[(K, V)] {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
if (carbonTable.isPartitionTable) {
carbonLoadModel.setTaskNo(String.valueOf(carbonSparkPartition.partitionId))
} else {
carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
}
val partitionSpec = if (carbonTable.isHivePartitionTable) {
carbonSparkPartition.partitionSpec.get
} else {
null
}
var mergeStatus = false
var mergeNumber = ""
var exec: CarbonCompactionExecutor = null
var processor: AbstractResultProcessor = null
var rawResultIteratorMap: util.Map[String, util.List[RawResultIterator]] = _
try {
// sorting the table block info List.
val splitList = if (null == rangeColumn || singleRange) {
// In case of non-range column or single value inside the range column we do not use
// the broadcast splits, only for range column we use the broadcast splits(which have
// all the splits)
carbonSparkPartition.split.value.getAllSplits
} else {
broadCastSplits.value
}
val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList)
Collections.sort(tableBlockInfoList)
// During UPDATE DELTA COMPACTION case all the blocks received in compute belongs to
// one segment, so max cardinality will be calculated from first block of segment
if (CompactionType.IUD_UPDDEL_DELTA == carbonMergerMapping.campactionType) {
var dataFileFooter: DataFileFooter = null
try {
// As the tableBlockInfoList is sorted take the ColCardinality from the last
// Block of the sorted list as it will have the last updated cardinality.
// Blocks are sorted by order of updation using TableBlockInfo.compare method so
// the last block after the sort will be the latest one.
dataFileFooter = CarbonUtil
.readMetadataFile(tableBlockInfoList.get(tableBlockInfoList.size() - 1))
} catch {
case e: IOException =>
logError("Exception in preparing the data file footer for compaction " + e.getMessage)
throw e
}
// target load name will be same as source load name in case of update data compaction
carbonMergerMapping.mergedLoadName = tableBlockInfoList.get(0).getSegmentId
carbonMergerMapping.maxSegmentColCardinality = dataFileFooter.getSegmentInfo
.getColumnCardinality
carbonMergerMapping.maxSegmentColumnSchemaList = dataFileFooter.getColumnInTable.asScala
.toList
}
mergeNumber = if (CompactionType.IUD_UPDDEL_DELTA == carbonMergerMapping.campactionType) {
tableBlockInfoList.get(0).getSegment.toString
} else {
mergedLoadName.substring(
mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
CarbonCommonConstants.LOAD_FOLDER.length(),
mergedLoadName.length()
)
}
carbonLoadModel.setSegmentId(mergeNumber)
if(carbonTable.isHivePartitionTable) {
carbonLoadModel.setTaskNo(
CarbonScalaUtil.generateUniqueNumber(
theSplit.index,
mergeNumber.replace(".", ""), 0L))
}
CommonUtil.setTempStoreLocation(theSplit.index, carbonLoadModel, true, false)
// get destination segment properties as sent from driver which is of last segment.
val segmentProperties = new SegmentProperties(
carbonMergerMapping.maxSegmentColumnSchemaList.asJava,
carbonMergerMapping.maxSegmentColCardinality)
val segmentMapping: java.util.Map[String, TaskBlockInfo] =
CarbonCompactionUtil.createMappingForSegments(tableBlockInfoList)
val dataFileMetadataSegMapping: java.util.Map[String, List[DataFileFooter]] =
CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList,
carbonTable.getSortScope != SortScopeOptions.SortScope.NO_SORT)
carbonLoadModel.setTablePath(tablePath)
// check for restructured block
// TODO: only in case of add and drop this variable should be true
val restructuredBlockExists: Boolean = CarbonCompactionUtil
.checkIfAnyRestructuredBlockExists(segmentMapping,
dataFileMetadataSegMapping,
carbonTable.getTableLastUpdatedTime)
LOGGER.info(s"Restructured block exists: $restructuredBlockExists")
DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties,
carbonTable, dataFileMetadataSegMapping, restructuredBlockExists,
new SparkDataTypeConverterImpl)
// add task completion listener to clean up the resources
context.addTaskCompletionListener { _ =>
close()
}
try {
// fire a query and get the results.
var expr: expression.Expression = null
if (null != expressionMapForRangeCol) {
expr = expressionMapForRangeCol
.get(theSplit.asInstanceOf[CarbonSparkPartition].idx)
}
rawResultIteratorMap = exec.processTableBlocks(FileFactory.getConfiguration, expr)
} catch {
case e: Throwable =>
LOGGER.error(e)
if (null != e.getMessage) {
CarbonException.analysisException(
s"Exception occurred in query execution :: ${ e.getMessage }")
} else {
CarbonException.analysisException(
"Exception occurred in query execution.Please check logs.")
}
}
val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
carbonTable, carbonLoadModel.getTaskNo, mergeNumber, true, false)
if (carbonTable.getSortScope == SortScopeOptions.SortScope.NO_SORT ||
rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX).size() == 0) {
LOGGER.info("RowResultMergerProcessor flow is selected")
processor = new RowResultMergerProcessor(
databaseName,
factTableName,
segmentProperties,
tempStoreLoc,
carbonLoadModel,
carbonMergerMapping.campactionType,
partitionSpec)
} else {
LOGGER.info("CompactionResultSortProcessor flow is selected")
processor = new CompactionResultSortProcessor(
carbonLoadModel,
carbonTable,
segmentProperties,
carbonMergerMapping.campactionType,
factTableName,
partitionSpec)
}
mergeStatus = processor.execute(
rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX),
rawResultIteratorMap.get(CarbonCompactionUtil.SORTED_IDX))
mergeResult = tableBlockInfoList.get(0).getSegmentId + ',' + mergeNumber
} catch {
case e: Exception =>
LOGGER.error("Compaction Failed ", e)
throw e
}
private def close(): Unit = {
deleteLocalDataFolders()
// close all the query executor service and clean up memory acquired during query processing
if (null != exec) {
LOGGER.info("Cleaning up query resources acquired during compaction")
exec.close(rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX), queryStartTime)
exec.close(rawResultIteratorMap.get(CarbonCompactionUtil.SORTED_IDX), queryStartTime)
}
// clean up the resources for processor
if (null != processor) {
LOGGER.info("Closing compaction processor instance to clean up loading resources")
processor.close()
}
}
private def deleteLocalDataFolders(): Unit = {
try {
LOGGER.info("Deleting local folder store location")
val isCompactionFlow = true
TableProcessingOperations
.deleteLocalDataLoadFolderLocation(carbonLoadModel, isCompactionFlow, false)
} catch {
case e: Exception =>
LOGGER.error(e)
}
}
var finished = false
override def hasNext: Boolean = {
!finished
}
override def next(): (K, V) = {
finished = true
result.getKey(mergeResult, mergeStatus)
}
}
iter
}
override def internalGetPartitions: Array[Partition] = {
val startTime = System.currentTimeMillis()
val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
)
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
if (CarbonProperties.getInstance().isRangeCompactionAllowed &&
!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
// If the table is not a partition table then only we go for range column compaction flow
rangeColumn = carbonTable.getRangeColumn
}
val dataType: DataType = if (null != rangeColumn) {
rangeColumn.getDataType
} else {
null
}
val isRangeColSortCol = rangeColumn != null && rangeColumn.isDimension &&
rangeColumn.asInstanceOf[CarbonDimension].isSortColumn
val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
carbonTable)
val jobConf: JobConf = new JobConf(getConf)
SparkHadoopUtil.get.addCredentials(jobConf)
val job: Job = new Job(jobConf)
val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
CarbonInputFormat.setPartitionsToPrune(
job.getConfiguration,
carbonMergerMapping.currentPartitions.map(_.asJava).orNull)
CarbonInputFormat.setTableInfo(job.getConfiguration,
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
var updateDetails: UpdateVO = null
// initialise query_id for job
job.getConfiguration.set("query.id", queryId)
var defaultParallelism = sparkContext.defaultParallelism
val result = new java.util.ArrayList[Partition](defaultParallelism)
var taskPartitionNo = 0
var carbonPartitionId = 0
var noOfBlocks = 0
val taskInfoList = new java.util.ArrayList[Distributable]
var carbonInputSplits = mutable.ArrayBuffer[CarbonInputSplit]()
var allSplits = new java.util.ArrayList[InputSplit]
var splitsOfLastSegment: List[CarbonInputSplit] = null
// map for keeping the relation of a task and its blocks.
val taskIdMapping: java.util.Map[String, java.util.List[CarbonInputSplit]] = new
java.util.HashMap[String, java.util.List[CarbonInputSplit]]
var totalSize: Double = 0
var totalTaskCount: Integer = 0
var loadMetadataDetails: Array[LoadMetadataDetails] = null
// Only for range column get the details for the size of segments
if (null != rangeColumn) {
loadMetadataDetails = SegmentStatusManager
.readLoadMetadata(CarbonTablePath.getMetadataPath(tablePath))
}
val validSegIds: java.util.List[String] = new util.ArrayList[String]()
// for each valid segment.
for (eachSeg <- carbonMergerMapping.validSegments) {
// In case of range column get the size for calculation of number of ranges
if (null != rangeColumn) {
for (details <- loadMetadataDetails) {
if (details.getLoadName == eachSeg.getSegmentNo) {
totalSize = totalSize + (details.getDataSize.toDouble)
}
}
}
validSegIds.add(eachSeg.getSegmentNo)
}
// map for keeping the relation of a task and its blocks.
job.getConfiguration
.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, validSegIds.asScala.mkString(","))
val updated: Boolean = updateStatusManager.getUpdateStatusDetails.length != 0
// get splits
val splits = format.getSplits(job)
// keep on assigning till last one is reached.
if (null != splits && splits.size > 0) {
splitsOfLastSegment = splits.asScala
.map(_.asInstanceOf[CarbonInputSplit])
.filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava
}
val filteredSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter { entry =>
val segmentId = Segment.toSegment(entry.getSegmentId).getSegmentNo
val blockInfo = new TableBlockInfo(entry.getFilePath,
entry.getStart, entry.getSegmentId,
entry.getLocations, entry.getLength, entry.getVersion,
updateStatusManager.getDeleteDeltaFilePath(
entry.getFilePath,
segmentId)
)
if (updateStatusManager.getUpdateStatusDetails.length != 0) {
updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId)
}
// filter splits with V3 data file format
// if split is updated, then check for if it is valid segment based on update details
(!updated ||
(updated && (!CarbonUtil.isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
updateDetails, updateStatusManager)))) &&
FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
}
if (rangeColumn != null) {
totalTaskCount = totalTaskCount +
CarbonCompactionUtil.getTaskCountForSegment(filteredSplits.toArray)
}
carbonInputSplits ++= filteredSplits
allSplits.addAll(filteredSplits.asJava)
totalTaskCount = totalTaskCount / carbonMergerMapping.validSegments.size
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
var allRanges: Array[Object] = new Array[Object](0)
if (rangeColumn != null) {
// Calculate the number of ranges to be made, min 2 ranges/tasks to be made in any case
// We take the minimum of average number of tasks created during load time and the number
// of tasks we get based on size for creating ranges.
val numOfPartitions = Math
.max(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL.toInt,
Math.min(totalTaskCount, DataLoadProcessBuilderOnSpark
.getNumPatitionsBasedOnSize(totalSize, carbonTable, carbonLoadModel, true)))
val colName = rangeColumn.getColName
LOGGER.info(s"Compacting on range column: $colName")
allRanges = getRangesFromRDD(rangeColumn,
carbonTable,
numOfPartitions,
allSplits,
dataType)
// If RangePartitioner does not give ranges in the case when the data is skewed with
// a lot of null records then we take the min/max from footer and set them for tasks
if (null == allRanges || allRanges.size == 1) {
allRanges = CarbonCompactionUtil.getOverallMinMax(carbonInputSplits.toList.toArray,
rangeColumn,
isRangeColSortCol)
if(allRanges(0) == allRanges(1)) {
// This will be true only if data has single values throughout
singleRange = true
}
}
LOGGER.info(s"Number of ranges:" + allRanges.size)
}
// prepare the details required to extract the segment properties using last segment.
if (null != splitsOfLastSegment && splitsOfLastSegment.size() > 0) {
val carbonInputSplit = splitsOfLastSegment.get(0)
var dataFileFooter: DataFileFooter = null
try {
dataFileFooter = CarbonUtil.readMetadataFile(
CarbonInputSplit.getTableBlockInfo(carbonInputSplit))
} catch {
case e: IOException =>
logError("Exception in preparing the data file footer for compaction " + e.getMessage)
throw e
}
}
val columnToCardinalityMap = new util.HashMap[java.lang.String, Integer]()
val partitionTaskMap = new util.HashMap[PartitionSpec, String]()
val counter = new AtomicInteger()
var indexOfRangeColumn = -1
var taskIdCount = 0
// As we are already handling null values in the filter expression separately so we
// can remove the null from the ranges we get, else it may lead to duplicate data
val newRanges = allRanges.filter { range =>
range != null
}
val noOfSplitsPerTask = Math.ceil(carbonInputSplits.size / defaultParallelism)
var taskCount = 0
// In case of range column if only one data value is present then we try to
// divide the splits to different tasks in order to avoid single task creation
// and load on single executor
if (singleRange) {
carbonInputSplits.foreach { split =>
var dataFileFooter: DataFileFooter = null
try {
dataFileFooter = CarbonUtil.readMetadataFile(
CarbonInputSplit.getTableBlockInfo(split))
} catch {
case e: IOException =>
logError("Exception in preparing the data file footer for compaction " + e.getMessage)
throw e
}
// add all the column and cardinality to the map
CarbonCompactionUtil
.addColumnCardinalityToMap(columnToCardinalityMap,
dataFileFooter.getColumnInTable,
dataFileFooter.getSegmentInfo.getColumnCardinality)
var splitList = taskIdMapping.get(taskCount.toString)
if (null != splitList && splitList.size == noOfSplitsPerTask) {
taskCount = taskCount + 1
splitList = taskIdMapping.get(taskCount.toString)
}
if (null == splitList) {
splitList = new util.ArrayList[CarbonInputSplit]()
taskIdMapping.put(taskCount.toString, splitList)
}
splitList.add(split)
}
} else {
carbonInputSplits.foreach { split =>
var dataFileFooter: DataFileFooter = null
if (null == rangeColumn) {
val taskNo = getTaskNo(split, partitionTaskMap, counter)
var sizeOfSplit = split.getLength
val splitList = taskIdMapping.get(taskNo)
noOfBlocks += 1
if (null == splitList) {
val splitTempList = new util.ArrayList[CarbonInputSplit]()
splitTempList.add(split)
taskIdMapping.put(taskNo, splitTempList)
} else {
splitList.add(split)
}
}
// Check the cardinality of each columns and set the highest.
try {
dataFileFooter = CarbonUtil.readMetadataFile(
CarbonInputSplit.getTableBlockInfo(split))
} catch {
case e: IOException =>
logError("Exception in preparing the data file footer for compaction " + e.getMessage)
throw e
}
// add all the column and cardinality to the map
CarbonCompactionUtil
.addColumnCardinalityToMap(columnToCardinalityMap,
dataFileFooter.getColumnInTable,
dataFileFooter.getSegmentInfo.getColumnCardinality)
// Create taskIdMapping here for range column by reading min/max values.
if (null != rangeColumn) {
if (null == expressionMapForRangeCol) {
expressionMapForRangeCol = new util.HashMap[Integer, Expression]()
}
if (-1 == indexOfRangeColumn) {
val allColumns = dataFileFooter.getColumnInTable
for (i <- 0 until allColumns.size()) {
if (allColumns.get(i).getColumnName.equalsIgnoreCase(rangeColumn.getColName)) {
indexOfRangeColumn = i
}
}
}
// Create ranges and add splits to the tasks
for (i <- 0 until (newRanges.size + 1)) {
if (null == expressionMapForRangeCol.get(i)) {
// Creating FilterExpression for the range column
var minVal: Object = null
var maxVal: Object = null
// For first task we will create an Or Filter and also accomodate null values
// For last task we will take as GreaterThan Expression of last value
if (i != 0) {
minVal = newRanges(i - 1)
}
if (i != newRanges.size) {
maxVal = newRanges(i)
}
val filterExpr = CarbonCompactionUtil
.getFilterExpressionForRange(rangeColumn,
minVal, maxVal, dataType)
expressionMapForRangeCol.put(i, filterExpr)
}
var splitList = taskIdMapping.get(i.toString)
noOfBlocks += 1
if (null == splitList) {
splitList = new util.ArrayList[CarbonInputSplit]()
taskIdMapping.put(i.toString, splitList)
}
splitList.add(split)
}
}
}
if (null != rangeColumn) {
// Broadcast all splits to all the executors only in case of range column
// having more than 1 unique value.
makeBroadCast(carbonInputSplits.asJava)
}
}
val updatedMaxSegmentColumnList = new util.ArrayList[ColumnSchema]()
// update cardinality and column schema list according to master schema
val cardinality = CarbonCompactionUtil
.updateColumnSchemaAndGetCardinality(columnToCardinalityMap,
carbonTable,
updatedMaxSegmentColumnList)
carbonMergerMapping.maxSegmentColumnSchemaList = updatedMaxSegmentColumnList.asScala.toList
// Set cardinality for new segment.
carbonMergerMapping.maxSegmentColCardinality = cardinality
taskIdMapping.asScala.foreach(
entry =>
taskInfoList
.add(new CarbonInputSplitTaskInfo(entry._1, entry._2).asInstanceOf[Distributable])
)
// get all the active nodes of cluster and prepare the nodeBlockMap based on these nodes
val activeNodes = DistributionUtil
.ensureExecutorsAndGetNodeList(taskInfoList.asScala, sparkContext)
val nodeBlockMap = CarbonLoaderUtil.nodeBlockMapping(taskInfoList, -1, activeNodes.asJava)
val nodeTaskBlocksMap = new java.util.HashMap[String, java.util.List[NodeInfo]]()
val nodes = DistributionUtil.getNodeList(sparkContext)
logInfo("no.of.nodes where data present=" + nodeBlockMap.size())
defaultParallelism = sparkContext.defaultParallelism
val isPartitionTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isPartitionTable
// Create Spark Partition for each task and assign blocks
nodeBlockMap.asScala.foreach { case (nodeName, splitList) =>
val taskSplitList = new java.util.ArrayList[NodeInfo](0)
nodeTaskBlocksMap.put(nodeName, taskSplitList)
var blockletCount = 0
splitList.asScala.foreach { splitInfo =>
val splitsPerNode = splitInfo.asInstanceOf[CarbonInputSplitTaskInfo]
blockletCount = blockletCount + splitsPerNode.getCarbonInputSplitList.size()
taskSplitList.add(
NodeInfo(splitsPerNode.getTaskId, splitsPerNode.getCarbonInputSplitList.size()))
if (blockletCount != 0) {
val taskInfo = splitInfo.asInstanceOf[CarbonInputSplitTaskInfo]
val multiBlockSplit = if (null == rangeColumn || singleRange) {
new CarbonMultiBlockSplit(
taskInfo.getCarbonInputSplitList,
Array(nodeName))
} else {
var splitListForRange = new util.ArrayList[CarbonInputSplit]()
new CarbonMultiBlockSplit(
splitListForRange,
Array(nodeName))
}
if (isPartitionTable) {
carbonPartitionId = Integer.parseInt(taskInfo.getTaskId)
}
result.add(
new CarbonSparkPartition(
id,
taskPartitionNo,
multiBlockSplit,
carbonPartitionId,
getPartitionNamesFromTask(taskInfo.getTaskId, partitionTaskMap)))
taskPartitionNo += 1
}
}
}
// print the node info along with task and number of blocks for the task.
nodeTaskBlocksMap.asScala.foreach((entry: (String, List[NodeInfo])) => {
logInfo(s"for the node ${ entry._1 }")
for (elem <- entry._2.asScala) {
logInfo("Task ID is " + elem.TaskId + "no. of blocks is " + elem.noOfBlocks)
}
})
val noOfNodes = nodes.length
val noOfTasks = result.size
logInfo(s"Identified no.of.Blocks: $noOfBlocks," +
s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks")
logInfo("Time taken to identify Blocks to scan : " + (System.currentTimeMillis() - startTime))
if (rangeColumn == null) {
for (j <- 0 until result.size) {
val multiBlockSplit = result.get(j).asInstanceOf[CarbonSparkPartition].split.value
val splitList = multiBlockSplit.getAllSplits
logInfo(s"Node: ${ multiBlockSplit.getLocations.mkString(",") }, No.Of Blocks: " +
s"${ CarbonInputSplit.createBlocks(splitList).size }")
}
}
result.toArray(new Array[Partition](result.size))
}
private def getRangesFromRDD(rangeColumn: CarbonColumn,
carbonTable: CarbonTable,
defaultParallelism: Int,
allSplits: java.util.ArrayList[InputSplit],
dataType: DataType): Array[Object] = {
val inputMetricsStats: CarbonInputMetrics = new CarbonInputMetrics
val projection = new CarbonProjection
projection.addColumn(rangeColumn.getColName)
val scanRdd = new CarbonScanRDD[InternalRow](
ss,
projection,
null,
carbonTable.getAbsoluteTableIdentifier,
carbonTable.getTableInfo.serialize(),
carbonTable.getTableInfo,
inputMetricsStats,
partitionNames = null,
splits = allSplits)
val objectOrdering: Ordering[Object] = createOrderingForColumn(rangeColumn)
val sparkDataType = Util.convertCarbonToSparkDataType(dataType)
// Change string type to support all types
val sampleRdd = scanRdd
.map(row => (row.get(0, sparkDataType), null))
val value = new DataSkewRangePartitioner(
defaultParallelism, sampleRdd, true)(objectOrdering, classTag[Object])
value.rangeBounds
}
private def createOrderingForColumn(column: CarbonColumn): Ordering[Object] = {
if (column.isDimension) {
val dimension = column.asInstanceOf[CarbonDimension]
if ((dimension.isGlobalDictionaryEncoding || dimension.isDirectDictionaryEncoding) &&
dimension.getDataType != DataTypes.TIMESTAMP) {
new PrimtiveOrdering(DataTypes.INT)
} else {
if (DataTypeUtil.isPrimitiveColumn(column.getDataType)) {
new PrimtiveOrdering(column.getDataType)
} else {
new StringOrdering()
}
}
} else {
new PrimtiveOrdering(column.getDataType)
}
}
private def getTaskNo(
split: CarbonInputSplit,
partitionTaskMap: util.Map[PartitionSpec, String],
counter: AtomicInteger): String = {
if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
val path = split.getPath.getParent
val partTask =
carbonMergerMapping.currentPartitions.get.find(p => p.getLocation.equals(path)) match {
case Some(part) => part
case None =>
throw new UnsupportedOperationException("Cannot do compaction on dropped partition")
}
var task = partitionTaskMap.get(partTask)
if (task == null) {
task = counter.incrementAndGet().toString
partitionTaskMap.put(partTask, task)
}
task
} else {
split.taskId
}
}
private def getPartitionNamesFromTask(taskId: String,
partitionTaskMap: util.Map[PartitionSpec, String]): Option[PartitionSpec] = {
if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
Some(partitionTaskMap.asScala.find(f => f._2.equals(taskId)).get._1)
} else {
None
}
}
override def getPreferredLocations(split: Partition): Seq[String] = {
val theSplit = split.asInstanceOf[CarbonSparkPartition]
theSplit.split.value.getLocations.filter(_ != "localhost")
}
}