blob: e89705196faff3905ee0aa66f86f0c6cabae84a9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.secondaryindex.rdd
import java.util
import java.util.concurrent.Callable
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import org.apache.spark.rdd.CarbonMergeFilesRDD
import org.apache.spark.sql.{CarbonEnv, SQLContext}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.index.CarbonIndexUtil
import org.apache.spark.sql.secondaryindex.command.SecondaryIndexModel
import org.apache.spark.sql.secondaryindex.events.{LoadTableSIPostExecutionEvent, LoadTableSIPreExecutionEvent}
import org.apache.spark.sql.secondaryindex.util.{FileInternalUtil, SecondaryIndexCreationResultImpl, SecondaryIndexUtil}
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, SegmentFileStore}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.indexserver.DistributedRDDUtils
import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.util.CarbonLoaderUtil
/**
* This class is aimed at creating secondary index for specified segments
*/
object SecondaryIndexCreator {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def createSecondaryIndex(secondaryIndexModel: SecondaryIndexModel,
segmentToLoadStartTimeMap: java.util.Map[String, String],
indexTable: CarbonTable,
forceAccessSegment: Boolean = false,
isCompactionCall: Boolean,
isLoadToFailedSISegments: Boolean): (CarbonTable, ListBuffer[ICarbonLock], OperationContext) = {
var indexCarbonTable = indexTable
val sc = secondaryIndexModel.sqlContext
// get the thread pool size for secondary index creation
val threadPoolSize = getThreadPoolSize(sc)
LOGGER
.info(s"Configured thread pool size for distributing segments in secondary index creation " +
s"is $threadPoolSize")
// create executor service to parallely run the segments
val executorService = java.util.concurrent.Executors.newFixedThreadPool(threadPoolSize)
if (null == indexCarbonTable) {
// avoid more lookupRelation to table
val metastore = CarbonEnv.getInstance(secondaryIndexModel.sqlContext.sparkSession)
.carbonMetaStore
indexCarbonTable = metastore
.lookupRelation(Some(secondaryIndexModel.carbonLoadModel.getDatabaseName),
secondaryIndexModel.secondaryIndex.indexName)(secondaryIndexModel.sqlContext
.sparkSession).carbonTable
}
val operationContext = new OperationContext
val loadTableSIPreExecutionEvent: LoadTableSIPreExecutionEvent =
LoadTableSIPreExecutionEvent(secondaryIndexModel.sqlContext.sparkSession,
new CarbonTableIdentifier(indexCarbonTable.getDatabaseName,
indexCarbonTable.getTableName, ""),
secondaryIndexModel.carbonLoadModel,
indexCarbonTable)
OperationListenerBus.getInstance
.fireEvent(loadTableSIPreExecutionEvent, operationContext)
var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
val validSegments: java.util.List[String] = new util.ArrayList[String]()
val skippedSegments: java.util.List[String] = new util.ArrayList[String]()
var validSegmentList = List.empty[String]
try {
for (eachSegment <- secondaryIndexModel.validSegments) {
// take segment lock before starting the actual load, so that the clearing the invalid
// segments in the parallel load to SI will not clear the current valid loading segment
// and this new segment will not be added as failed segment in
// SILoadEventListenerForFailedSegments as the failed segments list
// is validated based on the segment lock
val segmentLock = CarbonLockFactory
.getCarbonLockObj(indexCarbonTable.getAbsoluteTableIdentifier,
CarbonTablePath.addSegmentPrefix(eachSegment) + LockUsage.LOCK)
if (segmentLock.lockWithRetries(1, 0)) {
segmentLocks += segmentLock
// add only the segments for which we are able to get segments lock and trigger
// loading for these segments. if some segments are skipped,
// skipped segments load will be handled in SILoadEventListenerForFailedSegments
validSegments.add(eachSegment)
} else {
skippedSegments.add(eachSegment)
LOGGER.error(s"Not able to acquire the segment lock for table" +
s" ${indexCarbonTable.getTableUniqueName} for segment: $eachSegment. " +
s"Skipping this segment from loading.")
}
}
validSegmentList = validSegments.asScala.toList
if (validSegmentList.isEmpty) {
return (indexCarbonTable, segmentLocks, operationContext)
}
LOGGER.info(s"${indexCarbonTable.getTableUniqueName}: SI loading is started " +
s"for segments: $validSegmentList")
FileInternalUtil
.updateTableStatus(validSegmentList,
secondaryIndexModel.carbonLoadModel.getDatabaseName,
secondaryIndexModel.secondaryIndex.indexName,
SegmentStatus.INSERT_IN_PROGRESS,
secondaryIndexModel.segmentIdToLoadStartTimeMapping,
new java.util
.HashMap[String,
String](),
indexCarbonTable,
sc.sparkSession)
var execInstance = "1"
// in case of non dynamic executor allocation, number of executors are fixed.
if (sc.sparkContext.getConf.contains("spark.executor.instances")) {
execInstance = sc.sparkContext.getConf.get("spark.executor.instances")
LOGGER.info("spark.executor.instances property is set to =" + execInstance)
}
// in case of dynamic executor allocation, taking the max executors
// of the dynamic allocation.
else if (sc.sparkContext.getConf.contains("spark.dynamicAllocation.enabled")) {
if (sc.sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
.equalsIgnoreCase("true")) {
execInstance = sc.sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
LOGGER.info("spark.dynamicAllocation.maxExecutors property is set to =" + execInstance)
}
}
var futureObjectList = List[java.util.concurrent.Future[Array[(String, Boolean)]]]()
for (eachSegment <- validSegmentList) {
val segId = eachSegment
futureObjectList :+= executorService.submit(new Callable[Array[(String, Boolean)]] {
@throws(classOf[Exception])
override def call(): Array[(String, Boolean)] = {
ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo().getNonSerializableExtraInfo
.put("carbonConf", SparkSQLUtil.sessionState(sc.sparkSession).newHadoopConf())
var eachSegmentSecondaryIndexCreationStatus: Array[(String, Boolean)] = Array.empty
CarbonLoaderUtil.checkAndCreateCarbonDataLocation(segId, indexCarbonTable)
val carbonLoadModel = getCopyObject(secondaryIndexModel)
carbonLoadModel
.setFactTimeStamp(secondaryIndexModel.segmentIdToLoadStartTimeMapping(eachSegment))
carbonLoadModel.setTablePath(secondaryIndexModel.carbonTable.getTablePath)
val secondaryIndexCreationStatus = new CarbonSecondaryIndexRDD(sc.sparkSession,
new SecondaryIndexCreationResultImpl,
carbonLoadModel,
secondaryIndexModel.secondaryIndex,
segId, execInstance, indexCarbonTable, forceAccessSegment, isCompactionCall).collect()
SegmentFileStore
.writeSegmentFile(indexCarbonTable,
segId,
String.valueOf(carbonLoadModel.getFactTimeStamp))
segmentToLoadStartTimeMap.put(segId, carbonLoadModel.getFactTimeStamp.toString)
if (secondaryIndexCreationStatus.length > 0) {
eachSegmentSecondaryIndexCreationStatus = secondaryIndexCreationStatus
}
eachSegmentSecondaryIndexCreationStatus
}
})
}
val segmentSecondaryIndexCreationStatus = futureObjectList.filter(_.get().length > 0)
.groupBy(a => a.get().head._2)
val hasSuccessSegments = segmentSecondaryIndexCreationStatus.contains("true".toBoolean)
val hasFailedSegments = segmentSecondaryIndexCreationStatus.contains("false".toBoolean)
var successSISegments: List[String] = List()
var failedSISegments: List[String] = List()
if (hasSuccessSegments) {
successSISegments =
segmentSecondaryIndexCreationStatus("true".toBoolean).collect {
case segments: java.util.concurrent.Future[Array[(String, Boolean)]] =>
segments.get().head._1
}
}
if (hasFailedSegments) {
// if the call is from compaction, we need to fail the main table compaction also, and if
// the load is called from SIloadEventListener, which is for corresponding main table
// segment, then if SI load fails, we need to fail main table load also, so throw exception,
// if load is called from SI creation or SILoadEventListenerForFailedSegments, no need to
// fail, just make the segement as marked for delete, so that next load to main table will
// take care
if (isCompactionCall || !isLoadToFailedSISegments) {
throw new Exception("Secondary index creation failed")
} else {
failedSISegments =
segmentSecondaryIndexCreationStatus("false".toBoolean).collect {
case segments: java.util.concurrent.Future[Array[(String, Boolean)]] =>
segments.get().head._1
}
}
}
// what and all segments the load failed, only for those need make status as marked
// for delete, remaining let them be SUCCESS
var tableStatusUpdateForSuccess = false
var tableStatusUpdateForFailure = false
if (successSISegments.nonEmpty && !isCompactionCall) {
tableStatusUpdateForSuccess = FileInternalUtil.updateTableStatus(
successSISegments,
secondaryIndexModel.carbonLoadModel.getDatabaseName,
secondaryIndexModel.secondaryIndex.indexName,
SegmentStatus.INSERT_IN_PROGRESS,
secondaryIndexModel.segmentIdToLoadStartTimeMapping,
segmentToLoadStartTimeMap,
indexCarbonTable,
secondaryIndexModel.sqlContext.sparkSession)
// merge index files for success segments in case of only load
CarbonMergeFilesRDD.mergeIndexFiles(secondaryIndexModel.sqlContext.sparkSession,
successSISegments,
segmentToLoadStartTimeMap,
indexCarbonTable.getTablePath,
indexCarbonTable, mergeIndexProperty = false)
val loadMetadataDetails = SegmentStatusManager
.readLoadMetadata(indexCarbonTable.getMetadataPath)
.filter(loadMetadataDetail => successSISegments.contains(loadMetadataDetail.getLoadName))
val carbonLoadModelForMergeDataFiles = SecondaryIndexUtil
.getCarbonLoadModel(indexCarbonTable,
loadMetadataDetails.toList.asJava,
System.currentTimeMillis(),
CarbonIndexUtil
.getCompressorForIndexTable(indexCarbonTable, secondaryIndexModel.carbonTable))
// merge the data files of the loaded segments and take care of
// merging the index files inside this if needed
val rebuiltSegments = SecondaryIndexUtil
.mergeDataFilesSISegments(secondaryIndexModel.segmentIdToLoadStartTimeMapping,
indexCarbonTable,
loadMetadataDetails.toList.asJava, carbonLoadModelForMergeDataFiles)(sc)
tableStatusUpdateForSuccess = FileInternalUtil.updateTableStatus(
successSISegments,
secondaryIndexModel.carbonLoadModel.getDatabaseName,
secondaryIndexModel.secondaryIndex.indexName,
SegmentStatus.SUCCESS,
secondaryIndexModel.segmentIdToLoadStartTimeMapping,
segmentToLoadStartTimeMap,
indexCarbonTable,
secondaryIndexModel.sqlContext.sparkSession,
carbonLoadModelForMergeDataFiles.getFactTimeStamp,
rebuiltSegments)
}
if (!isCompactionCall) {
// Index PrePriming for SI
DistributedRDDUtils.triggerPrepriming(secondaryIndexModel.sqlContext.sparkSession,
indexCarbonTable,
Seq(),
operationContext,
FileFactory.getConfiguration,
validSegments.asScala.toList)
}
// update the status of all the segments to marked for delete if data load fails, so that
// next load which is triggered for SI table in post event of main table data load clears
// all the segments of marked for delete and retriggers the load to same segments again in
// that event
if (failedSISegments.nonEmpty && !isCompactionCall) {
tableStatusUpdateForFailure = FileInternalUtil.updateTableStatus(
failedSISegments,
secondaryIndexModel.carbonLoadModel.getDatabaseName,
secondaryIndexModel.secondaryIndex.indexName,
SegmentStatus.MARKED_FOR_DELETE,
secondaryIndexModel.segmentIdToLoadStartTimeMapping,
segmentToLoadStartTimeMap,
indexCarbonTable,
secondaryIndexModel.sqlContext.sparkSession)
}
if (failedSISegments.nonEmpty) {
LOGGER.error("Dataload to secondary index creation has failed")
}
if (!isCompactionCall) {
val loadTableSIPostExecutionEvent: LoadTableSIPostExecutionEvent =
LoadTableSIPostExecutionEvent(sc.sparkSession,
indexCarbonTable.getCarbonTableIdentifier,
secondaryIndexModel.carbonLoadModel,
indexCarbonTable)
OperationListenerBus.getInstance
.fireEvent(loadTableSIPostExecutionEvent, operationContext)
}
if (isCompactionCall) {
(indexCarbonTable, segmentLocks, operationContext)
} else {
(indexCarbonTable, ListBuffer.empty, operationContext)
}
} catch {
case ex: Exception =>
LOGGER.error("Load to SI table failed", ex)
FileInternalUtil
.updateTableStatus(validSegmentList,
secondaryIndexModel.carbonLoadModel.getDatabaseName,
secondaryIndexModel.secondaryIndex.indexName,
SegmentStatus.MARKED_FOR_DELETE,
secondaryIndexModel.segmentIdToLoadStartTimeMapping,
new java.util
.HashMap[String,
String](),
indexCarbonTable,
sc.sparkSession)
throw ex
} finally {
// if some segments are skipped, disable the SI table so that
// SILoadEventListenerForFailedSegments will take care to load to these segments in next
// consecutive load to main table.
if (!skippedSegments.isEmpty) {
secondaryIndexModel.sqlContext.sparkSession.sql(
s"""ALTER TABLE ${
secondaryIndexModel
.carbonLoadModel
.getDatabaseName
}.${ secondaryIndexModel.secondaryIndex.indexName } SET
|SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin).collect()
}
try {
if (!isCompactionCall) {
SegmentStatusManager
.deleteLoadsAndUpdateMetadata(indexCarbonTable, false, null)
}
} catch {
case e: Exception =>
LOGGER
.error("Problem while cleaning up stale folder for index table " +
secondaryIndexModel.secondaryIndex.indexName, e)
}
// close the executor service
if (null != executorService) {
executorService.shutdownNow()
}
// release the segment locks only for load flow
if (!isCompactionCall) {
segmentLocks.foreach(segmentLock => {
segmentLock.unlock()
})
}
}
}
/**
* will return the copy object of the existing object
*
* @return
*/
def getCopyObject(secondaryIndexModel: SecondaryIndexModel): CarbonLoadModel = {
val carbonLoadModel = secondaryIndexModel.carbonLoadModel
val copyObj = new CarbonLoadModel
copyObj.setTableName(carbonLoadModel.getTableName)
copyObj.setDatabaseName(carbonLoadModel.getDatabaseName)
copyObj.setLoadMetadataDetails(carbonLoadModel.getLoadMetadataDetails)
copyObj.setCarbonDataLoadSchema(carbonLoadModel.getCarbonDataLoadSchema)
val indexTable = CarbonEnv.getCarbonTable(
Some(carbonLoadModel.getDatabaseName),
secondaryIndexModel.secondaryIndex.indexName)(secondaryIndexModel.sqlContext.sparkSession)
copyObj.setColumnCompressor(
CarbonIndexUtil.getCompressorForIndexTable(
indexTable, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable))
copyObj.setFactTimeStamp(carbonLoadModel.getFactTimeStamp)
copyObj
}
/**
* This method will get the configuration for thread pool size which will decide the numbe rof
* segments to run in parallel for secondary index creation
*
*/
def getThreadPoolSize(sqlContext: SQLContext): Int = {
var threadPoolSize: Int = 0
try {
threadPoolSize = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_SECONDARY_INDEX_CREATION_THREADS,
CarbonCommonConstants.CARBON_SECONDARY_INDEX_CREATION_THREADS_DEFAULT).toInt
if (threadPoolSize >
CarbonCommonConstants.CARBON_SECONDARY_INDEX_CREATION_THREADS_MAX) {
threadPoolSize = CarbonCommonConstants.CARBON_SECONDARY_INDEX_CREATION_THREADS_MAX
LOGGER
.info(s"Configured thread pool size for secondary index creation is greater than " +
s"default parallelism. Therefore default value will be considered: $threadPoolSize")
} else {
val defaultThreadPoolSize =
CarbonCommonConstants.CARBON_SECONDARY_INDEX_CREATION_THREADS_DEFAULT.toInt
if (threadPoolSize < defaultThreadPoolSize) {
threadPoolSize = defaultThreadPoolSize
LOGGER
.info(s"Configured thread pool size for secondary index creation is incorrect. " +
s" Therefore default value will be considered: $threadPoolSize")
}
}
} catch {
case nfe: NumberFormatException =>
threadPoolSize = CarbonCommonConstants
.CARBON_SECONDARY_INDEX_CREATION_THREADS_DEFAULT.toInt
LOGGER
.info(s"Configured thread pool size for secondary index creation is incorrect. " +
s" Therefore default value will be considered: $threadPoolSize")
}
threadPoolSize
}
}