blob: 3187f14e30771f0ee70b5803f2d484bc962e6a94 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.rdd
import java.util
import java.util.{Collections, List}
import java.util.concurrent.ExecutorService
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{InputSplit, Job}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.{CarbonUtils, SparkSession, SQLContext}
import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel}
import org.apache.spark.sql.execution.command.management.CommonLoadUtils
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.util.{CollectionAccumulator, MergeIndexUtil}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
import org.apache.carbondata.hadoop.CarbonInputSplit
import org.apache.carbondata.indexserver.{DistributedRDDUtils, IndexServer}
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
import org.apache.carbondata.spark.MergeResultImpl
import org.apache.carbondata.view.MVManagerInSpark
/**
* This class is used to perform compaction on carbon table.
*/
class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
compactionModel: CompactionModel,
executor: ExecutorService,
sqlContext: SQLContext,
storeLocation: String,
compactedSegments: List[String],
operationContext: OperationContext)
extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) {
private def needSortSingleSegment(
loadsToMerge: java.util.List[LoadMetadataDetails]): Boolean = {
// support to resort old segment with old sort_columns
if (CompactionType.CUSTOM == compactionModel.compactionType &&
loadsToMerge.size() == 1 &&
SortScope.NO_SORT != compactionModel.carbonTable.getSortScope) {
!CarbonCompactionUtil.isSortedByCurrentSortColumns(
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, loadsToMerge.get(0))
} else {
false
}
}
override def executeCompaction(): Unit = {
val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
carbonLoadModel.getLoadMetadataDetails.asScala.filter(_.isCarbonFormat).asJava
)
CarbonDataMergerUtil.sortSegments(sortedSegments)
var loadsToMerge = identifySegmentsToBeMerged()
while (loadsToMerge.size() > 1 || needSortSingleSegment(loadsToMerge) ||
(CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType &&
loadsToMerge.size() > 0)) {
val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
deletePartialLoadsInCompaction()
val compactedLoad = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
loadsToMerge.asScala.foreach { segmentId =>
val segmentLock = CarbonLockFactory
.getCarbonLockObj(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
.getAbsoluteTableIdentifier,
CarbonTablePath.addSegmentPrefix(segmentId.getLoadName) + LockUsage.LOCK)
segmentLock.lockWithRetries()
segmentLocks += segmentLock
}
try {
scanSegmentsAndSubmitJob(loadsToMerge, compactedSegments, compactedLoad)
} catch {
case e: Exception =>
LOGGER.error(s"Exception in compaction thread ${ e.getMessage }", e)
// in case of exception, clear the cache loaded both in driver, and index server if
// enabled. Consider a scenario where listener is called for SI table to do load after
// compaction, then basically SI loads the new compacted load of main table to cache as it
// needs to select data from main table. after that if the load to SI fails, cache is to
// be cleared.
val compactedLoadToClear = compactedLoad.substring(
compactedLoad.lastIndexOf(CarbonCommonConstants.UNDERSCORE) + 1)
if (!CarbonProperties.getInstance()
.isDistributedPruningEnabled(carbonLoadModel.getDatabaseName,
carbonLoadModel.getTableName)) {
IndexStoreManager.getInstance()
.clearInvalidSegments(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
Collections.singletonList(compactedLoadToClear))
} else {
IndexServer.getClient
.invalidateSegmentCache(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
Array(compactedLoadToClear))
}
throw e
} finally {
segmentLocks.foreach { segmentLock =>
segmentLock.unlock()
}
}
// scan again and determine if anything is there to merge again.
carbonLoadModel.readAndSetLoadMetadataDetails()
var segList = carbonLoadModel.getLoadMetadataDetails
// in case of major compaction we will scan only once and come out as it will keep
// on doing major for the new loads also.
// excluding the newly added segments.
if (CompactionType.MAJOR == compactionModel.compactionType) {
segList = CarbonDataMergerUtil
.filterOutNewlyAddedSegments(carbonLoadModel.getLoadMetadataDetails, lastSegment)
}
if (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType ||
CompactionType.CUSTOM == compactionModel.compactionType) {
loadsToMerge.clear()
} else if (segList.size > 0) {
loadsToMerge = identifySegmentsToBeMerged()
if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
}
}
else {
loadsToMerge.clear()
}
}
}
/**
* This will submit the loads to be merged into the executor.
*/
def scanSegmentsAndSubmitJob(loadsToMerge: util.List[LoadMetadataDetails],
compactedSegments: List[String], mergedLoadName: String): Unit = {
loadsToMerge.asScala.foreach { seg =>
LOGGER.info("loads identified for merge is " + seg.getLoadName)
}
val compactionCallableModel = CompactionCallableModel(
carbonLoadModel,
compactionModel.carbonTable,
loadsToMerge,
sqlContext,
compactionModel.compactionType,
compactionModel.currentPartitions,
compactedSegments)
triggerCompaction(compactionCallableModel, mergedLoadName: String)
}
private def triggerCompaction(compactionCallableModel: CompactionCallableModel,
mergedLoadName: String): Unit = {
val carbonTable = compactionCallableModel.carbonTable
val loadsToMerge = compactionCallableModel.loadsToMerge
val sc = compactionCallableModel.sqlContext
val carbonLoadModel = compactionCallableModel.carbonLoadModel
val compactionType = compactionCallableModel.compactionType
val partitions = compactionCallableModel.currentPartitions
val tablePath = carbonLoadModel.getTablePath
val startTime = System.nanoTime()
val mergedLoads = compactionCallableModel.compactedSegments
mergedLoads.add(mergedLoadName)
var finalMergeStatus = false
val databaseName: String = carbonLoadModel.getDatabaseName
val factTableName = carbonLoadModel.getTableName
val validSegments: List[Segment] = CarbonDataMergerUtil.getValidSegments(loadsToMerge)
val carbonMergerMapping = CarbonMergerMapping(
tablePath,
carbonTable.getMetadataPath,
mergedLoadName,
databaseName,
factTableName,
validSegments.asScala.toArray,
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
compactionType,
maxSegmentColumnSchemaList = null,
currentPartitions = partitions)
carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation)
carbonLoadModel.setLoadMetadataDetails(
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath).toList.asJava)
// trigger event for compaction
val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
AlterTableCompactionPreEvent(sqlContext.sparkSession,
carbonTable,
carbonMergerMapping,
mergedLoadName)
OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
// Add pre event listener for index indexSchema
val tableIndexes = IndexStoreManager.getInstance().getAllCGAndFGIndexes(carbonTable)
val indexOperationContext = new OperationContext()
if (null != tableIndexes) {
val indexNames: mutable.Buffer[String] =
tableIndexes.asScala.map(index => index.getIndexSchema.getIndexName)
val indexPreExecutionEvent: BuildIndexPreExecutionEvent =
new BuildIndexPreExecutionEvent(sqlContext.sparkSession,
carbonTable.getAbsoluteTableIdentifier, indexNames)
OperationListenerBus.getInstance().fireEvent(indexPreExecutionEvent,
indexOperationContext)
}
// accumulator to collect segment metadata
val segmentMetaDataAccumulator = sqlContext
.sparkContext
.collectionAccumulator[Map[String, SegmentMetaDataInfo]]
val mergeStatus =
if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
new CarbonIUDMergerRDD(
sc.sparkSession,
new MergeResultImpl(),
carbonLoadModel,
carbonMergerMapping,
segmentMetaDataAccumulator
).collect
} else if (SortScope.GLOBAL_SORT == carbonTable.getSortScope &&
!carbonTable.getSortColumns.isEmpty &&
carbonTable.getRangeColumn == null &&
CarbonUtil.isStandardCarbonTable(carbonTable)) {
compactSegmentsByGlobalSort(sc.sparkSession,
carbonLoadModel,
carbonMergerMapping,
segmentMetaDataAccumulator)
} else {
new CarbonMergerRDD(
sc.sparkSession,
new MergeResultImpl(),
carbonLoadModel,
carbonMergerMapping,
segmentMetaDataAccumulator
).collect
}
if (mergeStatus.length == 0) {
finalMergeStatus = false
} else {
finalMergeStatus = mergeStatus.forall(_._2)
}
if (finalMergeStatus) {
val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
var segmentFilesForIUDCompact = new util.ArrayList[Segment]()
var segmentFileName: String = null
if (carbonTable.isHivePartitionTable) {
val readPath =
CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) +
CarbonCommonConstants.FILE_SEPARATOR + carbonLoadModel.getFactTimeStamp + ".tmp"
// Merge all partition files into a single file.
segmentFileName =
mergedLoadNumber + "_" + carbonLoadModel.getFactTimeStamp
val segmentFile = SegmentFileStore
.mergeSegmentFiles(readPath,
segmentFileName,
CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath))
if (segmentFile != null) {
SegmentFileStore
.moveFromTempFolder(segmentFile,
carbonLoadModel.getFactTimeStamp + ".tmp",
carbonLoadModel.getTablePath)
}
segmentFileName = segmentFileName + CarbonTablePath.SEGMENT_EXT
} else {
// Get the segment files each updated segment in case of IUD compaction
if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
val segmentFilesList = loadsToMerge.asScala.map { seg =>
val segmentMetaDataInfo = new SegmentFileStore(carbonLoadModel.getTablePath,
seg.getSegmentFile).getSegmentFile.getSegmentMetaDataInfo
val file = SegmentFileStore.writeSegmentFile(
carbonTable,
seg.getLoadName,
carbonLoadModel.getFactTimeStamp.toString,
segmentMetaDataInfo)
new Segment(seg.getLoadName, file)
}.filter(_.getSegmentFileName != null).asJava
segmentFilesForIUDCompact = new util.ArrayList[Segment](segmentFilesList)
} else {
// get segmentMetadata info from accumulator
val segmentMetaDataInfo = CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
mergedLoadNumber,
segmentMetaDataAccumulator)
segmentFileName = SegmentFileStore.writeSegmentFile(
carbonTable,
mergedLoadNumber,
carbonLoadModel.getFactTimeStamp.toString,
segmentMetaDataInfo)
}
}
// clear segmentMetaDataAccumulator
segmentMetaDataAccumulator.reset()
// Used to inform the commit listener that the commit is fired from compaction flow.
operationContext.setProperty("isCompaction", "true")
// trigger event for compaction
val alterTableCompactionPreStatusUpdateEvent =
AlterTableCompactionPreStatusUpdateEvent(sc.sparkSession,
carbonTable,
carbonMergerMapping,
carbonLoadModel,
mergedLoadName)
OperationListenerBus.getInstance
.fireEvent(alterTableCompactionPreStatusUpdateEvent, operationContext)
val endTime = System.nanoTime()
LOGGER.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
val statusFileUpdation =
((compactionType == CompactionType.IUD_UPDDEL_DELTA) &&
CarbonDataMergerUtil
.updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge,
carbonTable.getMetadataPath,
carbonLoadModel,
segmentFilesForIUDCompact)) ||
CarbonDataMergerUtil.updateLoadMetadataWithMergeStatus(
loadsToMerge,
carbonTable.getMetadataPath,
mergedLoadNumber,
carbonLoadModel,
compactionType,
segmentFileName,
MVManagerInSpark.get(sc.sparkSession))
if (compactionType != CompactionType.IUD_DELETE_DELTA &&
compactionType != CompactionType.IUD_UPDDEL_DELTA) {
MergeIndexUtil.mergeIndexFilesOnCompaction(compactionCallableModel)
}
val compactionLoadStatusPostEvent = AlterTableCompactionPostStatusUpdateEvent(sc.sparkSession,
carbonTable,
carbonMergerMapping,
carbonLoadModel,
mergedLoadName)
OperationListenerBus.getInstance()
.fireEvent(compactionLoadStatusPostEvent, operationContext)
if (null != tableIndexes) {
val buildIndexPostExecutionEvent = new BuildIndexPostExecutionEvent(
sqlContext.sparkSession, carbonTable.getAbsoluteTableIdentifier,
null, Seq(mergedLoadNumber), true)
OperationListenerBus.getInstance()
.fireEvent(buildIndexPostExecutionEvent, indexOperationContext)
}
val commitDone = operationContext.getProperty("commitComplete")
val commitComplete = if (null != commitDone) {
commitDone.toString.toBoolean
} else {
true
}
// here either of the conditions can be true, when delete segment is fired after compaction
// has started, statusFileUpdation will be false , but at the same time commitComplete can be
// true because compaction for all indexes will be finished at a time to the maximum level
// possible (level 1, 2 etc). so we need to check for either condition
if (!statusFileUpdation || !commitComplete) {
LOGGER.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
s"${ carbonLoadModel.getTableName }")
throw new Exception(s"Compaction failed to update metadata for table" +
s" ${ carbonLoadModel.getDatabaseName }." +
s"${ carbonLoadModel.getTableName }")
} else {
LOGGER.info(s"Compaction request completed for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
// Prepriming index for compaction
val segmentsForPriming = if (compactionType.equals(CompactionType.IUD_DELETE_DELTA) ||
compactionType.equals(CompactionType.IUD_UPDDEL_DELTA)) {
validSegments.asScala.map(_.getSegmentNo).toList
} else if (compactionType.equals(CompactionType.MAJOR) ||
compactionType.equals(CompactionType.MINOR) ||
compactionType.equals(CompactionType.CUSTOM)) {
scala.List(mergedLoadNumber)
} else {
scala.List()
}
DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession,
carbonTable,
validSegments.asScala.map(_.getSegmentNo).toList,
operationContext,
FileFactory.getConfiguration,
segmentsForPriming)
}
} else {
LOGGER.error(s"Compaction request failed for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
throw new Exception("Compaction Failure in Merger Rdd.")
}
}
/**
* compact segments by global sort
*/
def compactSegmentsByGlobalSort(
sparkSession: SparkSession,
carbonLoadModel: CarbonLoadModel,
carbonMergerMapping: CarbonMergerMapping,
segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]])
: Array[(String, Boolean)] = {
val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val splits = splitsOfSegments(
sparkSession,
table,
carbonMergerMapping.validSegments)
var loadResult: Array[(String, Boolean)] = null
try {
CarbonUtils
.threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
table.getDatabaseName + CarbonCommonConstants.POINT + table.getTableName,
splits.asScala.map(s => s.asInstanceOf[CarbonInputSplit].getSegmentId).mkString(","))
val dataFrame = SparkSQLUtil.createInputDataFrame(
sparkSession,
table)
// generate LoadModel which can be used global_sort flow
val outputModel = DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(
sparkSession, table)
// set fact time stamp, else the carbondata file will be created with fact timestamp as 0.
outputModel.setFactTimeStamp(carbonLoadModel.getFactTimeStamp)
outputModel.setLoadMetadataDetails(carbonLoadModel.getLoadMetadataDetails)
outputModel.setSegmentId(carbonMergerMapping.mergedLoadName.split("_")(1))
loadResult = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
sparkSession,
Option(dataFrame),
outputModel,
SparkSQLUtil.sessionState(sparkSession).newHadoopConf(),
segmentMetaDataAccumulator)
.map { row =>
(row._1, FailureCauses.NONE == row._2._2.failureCauses)
}
} finally {
CarbonUtils
.threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
table.getDatabaseName + "." +
table.getTableName)
}
loadResult
}
/**
* get splits of specified segments
*/
def splitsOfSegments(
sparkSession: SparkSession,
carbonTable: CarbonTable,
segments: Array[Segment]
): java.util.List[InputSplit] = {
val jobConf = new JobConf(SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
SparkHadoopUtil.get.addCredentials(jobConf)
val job = Job.getInstance(jobConf)
val conf = job.getConfiguration
CarbonInputFormat.setTablePath(conf, carbonTable.getTablePath)
CarbonInputFormat.setTableInfo(conf, carbonTable.getTableInfo)
CarbonInputFormat.setDatabaseName(conf, carbonTable.getDatabaseName)
CarbonInputFormat.setTableName(conf, carbonTable.getTableName)
CarbonInputFormat.setQuerySegment(conf, segments.map(_.getSegmentNo).mkString(","))
new CarbonTableInputFormat[Object].getSplits(job)
}
}