blob: 6b10bc0de921e5b3c0f5e9a87436a8a6c10254f2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.secondaryindex.rdd
import java.util
import java.util.Collections
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Random
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.execution.command.NodeInfo
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.secondaryindex.command.SecondaryIndex
import org.apache.spark.sql.secondaryindex.query.{CarbonSecondaryIndexExecutor, SecondaryIndexQueryResultProcessor}
import org.apache.spark.sql.secondaryindex.util.{SecondaryIndexCreationResult, SecondaryIndexUtil}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo, TaskBlockInfo}
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.rdd.{CarbonRDD, CarbonSparkPartition}
class CarbonSecondaryIndexRDD[K, V](
@transient ss: SparkSession,
result: SecondaryIndexCreationResult[K, V],
carbonLoadModel: CarbonLoadModel,
secondaryIndex: SecondaryIndex,
segmentId: String,
confExecutorsTemp: String,
indexCarbonTable: CarbonTable,
forceAccessSegment: Boolean = false)
extends CarbonRDD[(K, V)](ss, Nil) {
private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
val defaultParallelism: Int = sparkContext.defaultParallelism
sparkContext.setLocalProperty("spark.scheduler.pool", "DDL")
sparkContext.setLocalProperty("spark.job.interruptOnCancel", "true")
var localStoreLocation: String = _
var columnCardinality: Array[Int] = Array[Int]()
val carbonStoreLocation: String = carbonLoadModel.getTablePath
val databaseName: String = carbonLoadModel.getDatabaseName
val factTableName: String = carbonLoadModel.getTableName
val tableId: String = carbonLoadModel
.getCarbonDataLoadSchema
.getCarbonTable
.getAbsoluteTableIdentifier
.getCarbonTableIdentifier
.getTableId
private val indexTable: CarbonTable = CarbonEnv.getCarbonTable(Some(databaseName),
secondaryIndex.indexTableName)(ss)
val factToIndexColumnMapping: Array[Int] = SecondaryIndexUtil
.prepareColumnMappingOfFactToIndexTable(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
indexTable, isDictColsAlone = false)
val factToIndexDictColumnMapping: Array[Int] = SecondaryIndexUtil
.prepareColumnMappingOfFactToIndexTable(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
indexTable, isDictColsAlone = true)
override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val iter: Iterator[(K, V)] = new Iterator[(K, V)] {
var uniqueStatusId: String = segmentId + CarbonCommonConstants.UNDERSCORE +
theSplit.index
var secondaryIndexCreationStatus = false
var exec: CarbonSecondaryIndexExecutor = _
try {
carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
// this property is used to determine whether temp location for carbon is inside
// container temp dir or is yarn application directory.
val carbonUseLocalDir = CarbonProperties.getInstance()
.getProperty("carbon.use.local.dir", "false")
if ("true".equalsIgnoreCase(carbonUseLocalDir)) {
val storeLocations = org.apache.carbondata.spark.util.Util
.getConfiguredLocalDirs(SparkEnv.get.conf)
if (null != storeLocations && storeLocations.nonEmpty) {
localStoreLocation = storeLocations(Random.nextInt(storeLocations.length))
}
if (localStoreLocation == null) {
localStoreLocation = System.getProperty("java.io.tmpdir")
}
}
else {
localStoreLocation = System.getProperty("java.io.tmpdir")
}
localStoreLocation = localStoreLocation + '/' + System.nanoTime() + '_' + theSplit.index
LOGGER.info("Temp storeLocation taken is " + localStoreLocation)
val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
// sorting the table block info List.
val splitList = carbonSparkPartition.split.value.getAllSplits
val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList)
Collections.sort(tableBlockInfoList)
val taskAndBlockMapping: TaskBlockInfo =
SecondaryIndexUtil.createTaskAndBlockMapping(tableBlockInfoList)
exec = new CarbonSecondaryIndexExecutor(taskAndBlockMapping,
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
secondaryIndex.columnNames.asJava,
new SparkDataTypeConverterImpl
)
// fire a query and get the results.
val queryResultIterators = exec.processTableBlocks()
// carbonLoadModel.setPartitionId("0")
carbonLoadModel.setSegmentId(segmentId)
val tempLocationKey = CarbonDataProcessorUtil
.getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
secondaryIndex.indexTableName,
carbonLoadModel.getSegmentId,
carbonLoadModel.getTaskNo,
false,
false)
CarbonProperties.getInstance().addProperty(tempLocationKey, localStoreLocation)
val secondaryIndexQueryResultProcessor: SecondaryIndexQueryResultProcessor = new
SecondaryIndexQueryResultProcessor(
carbonLoadModel,
columnCardinality,
segmentId,
indexCarbonTable,
factToIndexColumnMapping,
factToIndexDictColumnMapping)
context.addTaskCompletionListener { context =>
if (null != secondaryIndexQueryResultProcessor) {
secondaryIndexQueryResultProcessor.close()
}
}
secondaryIndexQueryResultProcessor.processQueryResult(queryResultIterators)
secondaryIndexCreationStatus = true
} catch {
case ex: Throwable =>
LOGGER.error("Exception occurred in secondary index creation rdd: " + ex)
throw ex
} finally {
if (null != exec) {
exec.finish()
}
}
var finished = false
override def hasNext: Boolean = {
!finished
}
override def next(): (K, V) = {
finished = true
result.getKey(segmentId, secondaryIndexCreationStatus)
}
}
iter
}
override def getPreferredLocations(split: Partition): Seq[String] = {
val theSplit = split.asInstanceOf[CarbonSparkPartition]
theSplit.split.value.getLocations.filter(_ != "localhost")
}
override def internalGetPartitions: Array[Partition] = {
val startTime = System.currentTimeMillis()
val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
carbonStoreLocation, databaseName, factTableName, tableId)
val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable)
val jobConf: JobConf = new JobConf(hadoopConf)
SparkHadoopUtil.get.addCredentials(jobConf)
val job: Job = new Job(jobConf)
val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
// initialise query_id for job
job.getConfiguration.set("query.id", queryId)
var defaultParallelism = sparkContext.defaultParallelism
val result = new java.util.ArrayList[Partition](defaultParallelism)
var partitionNo = 0
var columnSize = 0
var noOfBlocks = 0
// mapping of the node and block list.
var nodeBlockMapping: java.util.Map[String, java.util.List[Distributable]] = new
java.util.HashMap[String, java.util.List[Distributable]]
val taskInfoList = new java.util.ArrayList[Distributable]
var carbonInputSplits = mutable.Seq[CarbonInputSplit]()
var splitsOfLastSegment: util.List[CarbonInputSplit] = null
// map for keeping the relation of a task and its blocks.
val taskIdMapping: java.util.Map[String, java.util.List[CarbonInputSplit]] = new
java.util.HashMap[String, java.util.List[CarbonInputSplit]]
// map for keeping the relation of a task and its blocks.
job.getConfiguration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, segmentId)
CarbonInputFormat.setValidateSegmentsToAccess(job.getConfiguration, false)
// get splits
val splits = format.getSplits(job)
// keep on assigning till last one is reached.
if (!splits.isEmpty) {
splitsOfLastSegment = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).toList.asJava
carbonInputSplits ++= splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
carbonInputSplits.foreach(splits => {
val taskNo = splits.taskId
val splitList = taskIdMapping.get(taskNo)
noOfBlocks += 1
if (null == splitList) {
val splitTempList = new util.ArrayList[CarbonInputSplit]()
splitTempList.add(splits)
taskIdMapping.put(taskNo, splitTempList)
} else {
splitList.add(splits)
}
}
)
taskIdMapping.asScala.foreach(
entry =>
taskInfoList
.add(new CarbonInputSplitTaskInfo(entry._1, entry._2).asInstanceOf[Distributable])
)
// get all the active nodes of cluster and prepare the nodeBlockMap based on these nodes
val activeNodes = DistributionUtil
.ensureExecutorsAndGetNodeList(taskInfoList.asScala, sparkContext)
val nodeBlockMap = CarbonLoaderUtil.nodeBlockMapping(taskInfoList, -1, activeNodes.asJava)
val nodeTaskBlocksMap = new java.util.HashMap[String, java.util.List[NodeInfo]]()
val nodes = DistributionUtil.getNodeList(sparkContext)
logInfo("no.of.nodes where data present=" + nodeBlockMap.size())
defaultParallelism = sparkContext.defaultParallelism
// Create Spark Partition for each task and assign blocks
nodeBlockMap.asScala.foreach { case (nodeName, splitList) =>
val taskSplitList = new java.util.ArrayList[NodeInfo](0)
nodeTaskBlocksMap.put(nodeName, taskSplitList)
var blockletCount = 0
splitList.asScala.foreach { splitInfo =>
val splitsPerNode = splitInfo.asInstanceOf[CarbonInputSplitTaskInfo]
blockletCount = blockletCount + splitsPerNode.getCarbonInputSplitList.size()
taskSplitList.add(
NodeInfo(splitsPerNode.getTaskId, splitsPerNode.getCarbonInputSplitList.size()))
if (blockletCount != 0) {
val multiBlockSplit = new CarbonMultiBlockSplit(
splitInfo.asInstanceOf[CarbonInputSplitTaskInfo].getCarbonInputSplitList,
Array(nodeName))
result.add(new CarbonSparkPartition(id, partitionNo, multiBlockSplit))
partitionNo += 1
}
}
}
// print the node info along with task and number of blocks for the task.
nodeTaskBlocksMap.asScala.foreach((entry: (String, util.List[NodeInfo])) => {
logInfo(s"for the node ${ entry._1 }")
for (elem <- entry._2.asScala) {
logInfo("Task ID is " + elem.TaskId + "no. of blocks is " + elem.noOfBlocks)
}
})
val noOfNodes = nodes.length
val noOfTasks = result.size
logInfo(s"Identified no.of.Blocks: $noOfBlocks," +
s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: " +
s"$noOfTasks")
logInfo("Time taken to identify Blocks to scan : " + (System.currentTimeMillis() - startTime))
for (j <- 0 until result.size) {
val multiBlockSplit = result.get(j).asInstanceOf[CarbonSparkPartition].split.value
val splitList = multiBlockSplit.getAllSplits
val tableBlocks: util.List[TableBlockInfo] = CarbonInputSplit.createBlocks(splitList)
val tableBlocksSize: Int = tableBlocks.size
if (tableBlocksSize > 0) {
// read the footer and get column cardinality which will be same for all tasks in a
// segment
val dataFileFooter: DataFileFooter = SecondaryIndexUtil
.readFileFooter(tableBlocks.get(tableBlocks.size() - 1))
}
logInfo(s"Node: ${ multiBlockSplit.getLocations.mkString(",") }, No.Of Blocks: " +
s"${ CarbonInputSplit.createBlocks(splitList).size }")
}
result.toArray(new Array[Partition](result.size))
} else {
new Array[Partition](0)
}
}
}