blob: bfbf52d2e90466e97b11060377e006624b1f724a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.rdd
import java.util
import java.util.List
import java.util.concurrent.ExecutorService
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{InputSplit, Job}
import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel}
import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
import org.apache.spark.util.MergeIndexUtil
import org.apache.spark.CarbonInputMetrics
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.metadata.datatype.{StructField, StructType}
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.events._
import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat, CarbonTableOutputFormat}
import org.apache.carbondata.hadoop.CarbonProjection
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.processing.util.TableOptionConstant
import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
import org.apache.carbondata.spark.MergeResultImpl
import org.apache.carbondata.store.CarbonRowReadSupport
/**
* This class is used to perform compaction on carbon table.
*/
class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
compactionModel: CompactionModel,
executor: ExecutorService,
sqlContext: SQLContext,
storeLocation: String,
compactedSegments: List[String],
operationContext: OperationContext)
extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) {
private def needSortSingleSegment(
loadsToMerge: java.util.List[LoadMetadataDetails]): Boolean = {
// support to resort old segment with old sort_columns
if (CompactionType.CUSTOM == compactionModel.compactionType &&
loadsToMerge.size() == 1 &&
SortScope.NO_SORT != compactionModel.carbonTable.getSortScope) {
!CarbonCompactionUtil.isSortedByCurrentSortColumns(
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
loadsToMerge.get(0),
FileFactory.getConfiguration)
} else {
false
}
}
override def executeCompaction(): Unit = {
val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
carbonLoadModel.getLoadMetadataDetails
)
CarbonDataMergerUtil.sortSegments(sortedSegments)
var loadsToMerge = identifySegmentsToBeMerged()
while (loadsToMerge.size() > 1 || needSortSingleSegment(loadsToMerge) ||
(CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType &&
loadsToMerge.size() > 0)) {
val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
deletePartialLoadsInCompaction()
try {
scanSegmentsAndSubmitJob(loadsToMerge, compactedSegments)
} catch {
case e: Exception =>
LOGGER.error(s"Exception in compaction thread ${ e.getMessage }", e)
throw e
}
// scan again and determine if anything is there to merge again.
carbonLoadModel.readAndSetLoadMetadataDetails()
var segList = carbonLoadModel.getLoadMetadataDetails
// in case of major compaction we will scan only once and come out as it will keep
// on doing major for the new loads also.
// excluding the newly added segments.
if (CompactionType.MAJOR == compactionModel.compactionType) {
segList = CarbonDataMergerUtil
.filterOutNewlyAddedSegments(carbonLoadModel.getLoadMetadataDetails, lastSegment)
}
if (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType ||
CompactionType.CUSTOM == compactionModel.compactionType) {
loadsToMerge.clear()
} else if (segList.size > 0) {
loadsToMerge = identifySegmentsToBeMerged()
if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
}
}
else {
loadsToMerge.clear()
}
}
}
/**
* This will submit the loads to be merged into the executor.
*/
def scanSegmentsAndSubmitJob(loadsToMerge: util.List[LoadMetadataDetails],
compactedSegments: List[String]): Unit = {
loadsToMerge.asScala.foreach { seg =>
LOGGER.info("loads identified for merge is " + seg.getLoadName)
}
val compactionCallableModel = CompactionCallableModel(
carbonLoadModel,
compactionModel.carbonTable,
loadsToMerge,
sqlContext,
compactionModel.compactionType,
compactionModel.currentPartitions,
compactedSegments)
triggerCompaction(compactionCallableModel)
}
private def triggerCompaction(compactionCallableModel: CompactionCallableModel): Unit = {
val carbonTable = compactionCallableModel.carbonTable
val loadsToMerge = compactionCallableModel.loadsToMerge
val sc = compactionCallableModel.sqlContext
val carbonLoadModel = compactionCallableModel.carbonLoadModel
val compactionType = compactionCallableModel.compactionType
val partitions = compactionCallableModel.currentPartitions
val tablePath = carbonLoadModel.getTablePath
val startTime = System.nanoTime()
val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
val mergedLoads = compactionCallableModel.compactedSegments
mergedLoads.add(mergedLoadName)
var finalMergeStatus = false
val databaseName: String = carbonLoadModel.getDatabaseName
val factTableName = carbonLoadModel.getTableName
val validSegments: List[Segment] = CarbonDataMergerUtil.getValidSegments(loadsToMerge)
val carbonMergerMapping = CarbonMergerMapping(
tablePath,
carbonTable.getMetadataPath,
mergedLoadName,
databaseName,
factTableName,
validSegments.asScala.toArray,
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
compactionType,
maxSegmentColCardinality = null,
maxSegmentColumnSchemaList = null,
currentPartitions = partitions)
carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation)
carbonLoadModel.setLoadMetadataDetails(
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath).toList.asJava)
// trigger event for compaction
val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
AlterTableCompactionPreEvent(sqlContext.sparkSession,
carbonTable,
carbonMergerMapping,
mergedLoadName)
OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
// Add pre event listener for index datamap
val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable)
val dataMapOperationContext = new OperationContext()
if (null != tableDataMaps) {
val dataMapNames: mutable.Buffer[String] =
tableDataMaps.asScala.map(dataMap => dataMap.getDataMapSchema.getDataMapName)
val dataMapPreExecutionEvent: BuildDataMapPreExecutionEvent =
new BuildDataMapPreExecutionEvent(sqlContext.sparkSession,
carbonTable.getAbsoluteTableIdentifier, dataMapNames)
OperationListenerBus.getInstance().fireEvent(dataMapPreExecutionEvent,
dataMapOperationContext)
}
val mergeStatus =
if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
new CarbonIUDMergerRDD(
sc.sparkSession,
new MergeResultImpl(),
carbonLoadModel,
carbonMergerMapping
).collect
} else if (SortScope.GLOBAL_SORT == carbonTable.getSortScope &&
!carbonTable.getSortColumns.isEmpty &&
carbonTable.getRangeColumn == null &&
CarbonUtil.isStandardCarbonTable(carbonTable)) {
compactSegmentsByGlobalSort(sc.sparkSession, carbonLoadModel, carbonMergerMapping)
} else {
new CarbonMergerRDD(
sc.sparkSession,
new MergeResultImpl(),
carbonLoadModel,
carbonMergerMapping
).collect
}
if (mergeStatus.length == 0) {
finalMergeStatus = false
} else {
finalMergeStatus = mergeStatus.forall(_._2)
}
if (finalMergeStatus) {
val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
var segmentFilesForIUDCompact = new util.ArrayList[Segment]()
var segmentFileName: String = null
if (carbonTable.isHivePartitionTable) {
val readPath =
CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) +
CarbonCommonConstants.FILE_SEPARATOR + carbonLoadModel.getFactTimeStamp + ".tmp"
// Merge all partition files into a single file.
segmentFileName =
mergedLoadNumber + "_" + carbonLoadModel.getFactTimeStamp
val segmentFile = SegmentFileStore
.mergeSegmentFiles(readPath,
segmentFileName,
CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath))
if (segmentFile != null) {
SegmentFileStore
.moveFromTempFolder(segmentFile,
carbonLoadModel.getFactTimeStamp + ".tmp",
carbonLoadModel.getTablePath)
}
segmentFileName = segmentFileName + CarbonTablePath.SEGMENT_EXT
} else {
// Get the segment files each updated segment in case of IUD compaction
if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
val segmentFilesList = loadsToMerge.asScala.map{seg =>
val file = SegmentFileStore.writeSegmentFile(
carbonTable,
seg.getLoadName,
carbonLoadModel.getFactTimeStamp.toString)
new Segment(seg.getLoadName, file)
}.filter(_.getSegmentFileName != null).asJava
segmentFilesForIUDCompact = new util.ArrayList[Segment](segmentFilesList)
} else {
segmentFileName = SegmentFileStore.writeSegmentFile(
carbonTable,
mergedLoadNumber,
carbonLoadModel.getFactTimeStamp.toString)
}
}
// Used to inform the commit listener that the commit is fired from compaction flow.
operationContext.setProperty("isCompaction", "true")
// trigger event for compaction
val alterTableCompactionPreStatusUpdateEvent =
AlterTableCompactionPreStatusUpdateEvent(sc.sparkSession,
carbonTable,
carbonMergerMapping,
carbonLoadModel,
mergedLoadName)
OperationListenerBus.getInstance
.fireEvent(alterTableCompactionPreStatusUpdateEvent, operationContext)
val endTime = System.nanoTime()
LOGGER.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
val statusFileUpdation =
((compactionType == CompactionType.IUD_UPDDEL_DELTA) &&
CarbonDataMergerUtil
.updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge,
carbonTable.getMetadataPath,
carbonLoadModel,
segmentFilesForIUDCompact)) ||
CarbonDataMergerUtil.updateLoadMetadataWithMergeStatus(
loadsToMerge,
carbonTable.getMetadataPath,
mergedLoadNumber,
carbonLoadModel,
compactionType,
segmentFileName)
if (compactionType != CompactionType.IUD_DELETE_DELTA &&
compactionType != CompactionType.IUD_UPDDEL_DELTA) {
MergeIndexUtil.mergeIndexFilesOnCompaction(compactionCallableModel)
}
val compactionLoadStatusPostEvent = AlterTableCompactionPostStatusUpdateEvent(sc.sparkSession,
carbonTable,
carbonMergerMapping,
carbonLoadModel,
mergedLoadName)
OperationListenerBus.getInstance()
.fireEvent(compactionLoadStatusPostEvent, operationContext)
if (null != tableDataMaps) {
val buildDataMapPostExecutionEvent = new BuildDataMapPostExecutionEvent(
sqlContext.sparkSession, carbonTable.getAbsoluteTableIdentifier,
null, Seq(mergedLoadNumber), true)
OperationListenerBus.getInstance()
.fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext)
}
val commitDone = operationContext.getProperty("commitComplete")
val commitComplete = if (null != commitDone) {
commitDone.toString.toBoolean
} else {
true
}
// here either of the conditions can be true, when delete segment is fired after compaction
// has started, statusFileUpdation will be false , but at the same time commitComplete can be
// true because compaction for all datamaps will be finished at a time to the maximum level
// possible (level 1, 2 etc). so we need to check for either condition
if (!statusFileUpdation || !commitComplete) {
LOGGER.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
s"${ carbonLoadModel.getTableName }")
throw new Exception(s"Compaction failed to update metadata for table" +
s" ${ carbonLoadModel.getDatabaseName }." +
s"${ carbonLoadModel.getTableName }")
} else {
LOGGER.info(s"Compaction request completed for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
}
} else {
LOGGER.error(s"Compaction request failed for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
throw new Exception("Compaction Failure in Merger Rdd.")
}
}
/**
* compact segments by global sort
*/
def compactSegmentsByGlobalSort(
sparkSession: SparkSession,
carbonLoadModel: CarbonLoadModel,
carbonMergerMapping: CarbonMergerMapping): Array[(String, Boolean)] = {
val dataFrame = dataFrameOfSegments(
sparkSession,
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
carbonMergerMapping.validSegments)
// generate LoadModel which can be used global_sort flow
val outputModel = getLoadModelForGlobalSort(
sparkSession, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
carbonMergerMapping.validSegments)
outputModel.setSegmentId(carbonMergerMapping.mergedLoadName.split("_")(1))
DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
sparkSession,
Option(dataFrame),
outputModel,
SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
.map { row =>
(row._1, FailureCauses.NONE == row._2._2.failureCauses)
}
}
/**
* create DataFrame basing on specified segments
*/
def dataFrameOfSegments(
sparkSession: SparkSession,
carbonTable: CarbonTable,
segments: Array[Segment]
): DataFrame = {
val columns = carbonTable
.getCreateOrderColumn(carbonTable.getTableName)
.asScala
.map(_.getColName)
.toArray
val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
val rdd: RDD[Row] = new CarbonScanRDD[CarbonRow](
sparkSession,
columnProjection = new CarbonProjection(columns),
null,
carbonTable.getAbsoluteTableIdentifier,
carbonTable.getTableInfo.serialize,
carbonTable.getTableInfo,
new CarbonInputMetrics,
null,
null,
classOf[CarbonRowReadSupport],
splitsOfSegments(sparkSession, carbonTable, segments))
.map { row =>
new GenericRow(row.getData.asInstanceOf[Array[Any]])
}
sparkSession.createDataFrame(rdd, schema)
}
/**
* get splits of specified segments
*/
def splitsOfSegments(
sparkSession: SparkSession,
carbonTable: CarbonTable,
segments: Array[Segment]
): java.util.List[InputSplit] = {
val jobConf = new JobConf(SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
SparkHadoopUtil.get.addCredentials(jobConf)
val job = Job.getInstance(jobConf)
val conf = job.getConfiguration
CarbonInputFormat.setTablePath(conf, carbonTable.getTablePath)
CarbonInputFormat.setTableInfo(conf, carbonTable.getTableInfo)
CarbonInputFormat.setDatabaseName(conf, carbonTable.getDatabaseName)
CarbonInputFormat.setTableName(conf, carbonTable.getTableName)
CarbonInputFormat.setQuerySegment(conf, segments.map(_.getSegmentNo).mkString(","))
new CarbonTableInputFormat[Object].getSplits(job)
}
/**
* create CarbonLoadModel for global_sort compaction
*/
def getLoadModelForGlobalSort(
sparkSession: SparkSession,
carbonTable: CarbonTable,
segments: Array[Segment]
): CarbonLoadModel = {
val conf = SparkSQLUtil.sessionState(sparkSession).newHadoopConf()
CarbonTableOutputFormat.setDatabaseName(conf, carbonTable.getDatabaseName)
CarbonTableOutputFormat.setTableName(conf, carbonTable.getTableName)
CarbonTableOutputFormat.setCarbonTable(conf, carbonTable)
val fieldList = carbonTable
.getCreateOrderColumn(carbonTable.getTableName)
.asScala
.map { column =>
new StructField(column.getColName, column.getDataType)
}
CarbonTableOutputFormat.setInputSchema(conf, new StructType(fieldList.asJava))
val loadModel = CarbonTableOutputFormat.getLoadModel(conf)
loadModel.setSerializationNullFormat(
TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + ",\\N")
loadModel.setBadRecordsLoggerEnable(
TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + ",false")
loadModel.setBadRecordsAction(
TableOptionConstant.BAD_RECORDS_ACTION.getName() + ",force")
loadModel.setIsEmptyDataBadRecord(
DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + ",false")
val globalSortPartitions =
carbonTable.getTableInfo.getFactTable.getTableProperties.get("global_sort_partitions")
if (globalSortPartitions != null) {
loadModel.setGlobalSortPartitions(globalSortPartitions)
}
loadModel
}
}