blob: 0619851cfbbbd899c89f6e117572844080f562f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.rdd
import java.text.SimpleDateFormat
import java.util
import java.util.concurrent._
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.util.Random
import scala.util.control.Breaks._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD}
import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
import org.apache.spark.sql.execution.command.{CompactionModel, ExecutionErrors, UpdateTableModel}
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.util.CarbonException
import org.apache.carbondata.common.constants.LoggerAction
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion}
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.scan.partition.PartitionUtil
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.events.{LoadTablePostExecutionEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, StringArrayWritable}
import org.apache.carbondata.processing.loading.exception.{CarbonDataLoadingException, NoRetryException}
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.loading.sort.SortScopeOptions
import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory, _}
import org.apache.carbondata.spark.load._
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
/**
* This is the factory class which can create different RDD depends on user needs.
*
*/
object CarbonDataRDDFactory {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def handleCompactionForSystemLocking(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
storeLocation: String,
compactionType: CompactionType,
carbonTable: CarbonTable,
compactionModel: CompactionModel): Unit = {
// taking system level lock at the mdt file location
var configuredMdtPath = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT).trim
configuredMdtPath = CarbonUtil.checkAndAppendFileSystemURIScheme(configuredMdtPath)
val lock = CarbonLockFactory
.getCarbonLockObj(configuredMdtPath + CarbonCommonConstants.FILE_SEPARATOR +
CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
LockUsage.SYSTEMLEVEL_COMPACTION_LOCK)
if (lock.lockWithRetries()) {
LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
s".${ carbonLoadModel.getTableName }")
try {
startCompactionThreads(sqlContext,
carbonLoadModel,
storeLocation,
compactionModel,
lock
)
} catch {
case e: Exception =>
LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")
lock.unlock()
// if the compaction is a blocking call then only need to throw the exception.
if (compactionModel.isDDLTrigger) {
throw e
}
}
} else {
LOGGER.audit("Not able to acquire the system level compaction lock for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.error("Not able to acquire the compaction lock for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
CarbonCompactionUtil
.createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
// throw exception only in case of DDL trigger.
if (compactionModel.isDDLTrigger) {
CarbonException.analysisException(
s"Compaction is in progress, compaction request for table " +
s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}" +
" is in queue.")
} else {
LOGGER.error("Compaction is in progress, compaction request for table " +
s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}" +
" is in queue.")
}
}
}
def startCompactionThreads(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
storeLocation: String,
compactionModel: CompactionModel,
compactionLock: ICarbonLock): Unit = {
val executor: ExecutorService = Executors.newFixedThreadPool(1)
// update the updated table status.
if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
// update the updated table status. For the case of Update Delta Compaction the Metadata
// is filled in LoadModel, no need to refresh.
CommonUtil.readLoadMetadataDetails(carbonLoadModel)
}
val compactionThread = new Thread {
override def run(): Unit = {
try {
// compaction status of the table which is triggered by the user.
var triggeredCompactionStatus = false
var exception: Exception = null
try {
DataManagementFunc.executeCompaction(
carbonLoadModel,
compactionModel,
executor,
sqlContext,
storeLocation
)
triggeredCompactionStatus = true
} catch {
case e: Exception =>
LOGGER.error(s"Exception in compaction thread ${ e.getMessage }")
exception = e
}
// continue in case of exception also, check for all the tables.
val isConcurrentCompactionAllowed = CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
).equalsIgnoreCase("true")
if (!isConcurrentCompactionAllowed) {
LOGGER.info("System level compaction lock is enabled.")
val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
var tableForCompaction = CarbonCompactionUtil.getNextTableToCompact(
CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
.listAllTables(sqlContext.sparkSession).toArray,
skipCompactionTables.toList.asJava)
while (null != tableForCompaction) {
LOGGER.info("Compaction request has been identified for table " +
s"${ tableForCompaction.getDatabaseName }." +
s"${ tableForCompaction.getTableName}")
val table: CarbonTable = tableForCompaction
val metadataPath = table.getMetaDataFilepath
val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
val newCarbonLoadModel = prepareCarbonLoadModel(table)
val compactionSize = CarbonDataMergerUtil
.getCompactionSize(CompactionType.MAJOR_COMPACTION)
val newcompactionModel = CompactionModel(compactionSize,
compactionType,
table,
compactionModel.isDDLTrigger
)
// proceed for compaction
try {
DataManagementFunc.executeCompaction(newCarbonLoadModel,
newcompactionModel,
executor, sqlContext, storeLocation
)
} catch {
case e: Exception =>
LOGGER.error("Exception in compaction thread for table " +
s"${ tableForCompaction.getDatabaseName }." +
s"${ tableForCompaction.getTableName }")
// not handling the exception. only logging as this is not the table triggered
// by user.
} finally {
// delete the compaction required file in case of failure or success also.
if (!CarbonCompactionUtil
.deleteCompactionRequiredFile(metadataPath, compactionType)) {
// if the compaction request file is not been able to delete then
// add those tables details to the skip list so that it wont be considered next.
skipCompactionTables.+=:(tableForCompaction.getCarbonTableIdentifier)
LOGGER.error("Compaction request file can not be deleted for table " +
s"${ tableForCompaction.getDatabaseName }." +
s"${ tableForCompaction.getTableName }")
}
}
// ********* check again for all the tables.
tableForCompaction = CarbonCompactionUtil
.getNextTableToCompact(CarbonEnv.getInstance(sqlContext.sparkSession)
.carbonMetastore.listAllTables(sqlContext.sparkSession).toArray,
skipCompactionTables.asJava
)
}
}
// giving the user his error for telling in the beeline if his triggered table
// compaction is failed.
if (!triggeredCompactionStatus) {
throw new Exception("Exception in compaction " + exception.getMessage)
}
} finally {
executor.shutdownNow()
DataManagementFunc.deletePartialLoadsInCompaction(carbonLoadModel)
compactionLock.unlock()
}
}
}
// calling the run method of a thread to make the call as blocking call.
// in the future we may make this as concurrent.
compactionThread.run()
}
private def prepareCarbonLoadModel(
table: CarbonTable
): CarbonLoadModel = {
val loadModel = new CarbonLoadModel
loadModel.setTableName(table.getTableName)
val dataLoadSchema = new CarbonDataLoadSchema(table)
// Need to fill dimension relation
loadModel.setCarbonDataLoadSchema(dataLoadSchema)
loadModel.setTableName(table.getCarbonTableIdentifier.getTableName)
loadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
loadModel.setTablePath(table.getTablePath)
CommonUtil.readLoadMetadataDetails(loadModel)
val loadStartTime = CarbonUpdateUtil.readCurrentTime()
loadModel.setFactTimeStamp(loadStartTime)
loadModel
}
def loadCarbonData(
sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
storePath: String,
columnar: Boolean,
partitionStatus: SegmentStatus = SegmentStatus.SUCCESS,
result: Option[DictionaryServer],
overwriteTable: Boolean,
hadoopConf: Configuration,
dataFrame: Option[DataFrame] = None,
updateModel: Option[UpdateTableModel] = None): Unit = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val operationContext = new OperationContext
// for handling of the segment Merging.
LOGGER.audit(s"Data load request has been received for table" +
s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
// Check if any load need to be deleted before loading new data
DataManagementFunc.deleteLoadsAndUpdateMetadata(isForceDeletion = false, carbonTable)
var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null
// create new segment folder in carbon store
if (updateModel.isEmpty) {
CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
carbonLoadModel.getSegmentId, carbonTable)
}
var loadStatus = SegmentStatus.SUCCESS
var errorMessage: String = "DataLoad failure"
var executorMessage: String = ""
val isSortTable = carbonTable.getNumberOfSortColumns > 0
val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope)
try {
if (updateModel.isDefined) {
res = loadDataFrameForUpdate(
sqlContext,
dataFrame,
carbonLoadModel,
updateModel,
carbonTable)
res.foreach { resultOfSeg =>
resultOfSeg.foreach { resultOfBlock =>
if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) {
loadStatus = SegmentStatus.LOAD_FAILURE
if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) {
updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
updateModel.get.executorErrors.errorMsg = "Failure in the Executor."
} else {
updateModel.get.executorErrors = resultOfBlock._2._2
}
} else if (resultOfBlock._2._1.getSegmentStatus ==
SegmentStatus.LOAD_PARTIAL_SUCCESS) {
loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
}
}
}
} else {
status = if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) {
loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel, hadoopConf)
} else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkContext,
dataFrame, carbonLoadModel, hadoopConf)
} else if (dataFrame.isDefined) {
loadDataFrame(sqlContext, dataFrame, carbonLoadModel)
} else {
loadDataFile(sqlContext, carbonLoadModel, hadoopConf)
}
CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false)
val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus]
if (status.nonEmpty) {
status.foreach { eachLoadStatus =>
val state = newStatusMap.get(eachLoadStatus._1)
state match {
case Some(SegmentStatus.LOAD_FAILURE) =>
newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS)
if eachLoadStatus._2._1.getSegmentStatus ==
SegmentStatus.SUCCESS =>
newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
case _ =>
newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus)
}
}
newStatusMap.foreach {
case (key, value) =>
if (value == SegmentStatus.LOAD_FAILURE) {
loadStatus = SegmentStatus.LOAD_FAILURE
} else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
loadStatus!= SegmentStatus.LOAD_FAILURE) {
loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
}
}
} else {
loadStatus = SegmentStatus.LOAD_FAILURE
}
if (loadStatus != SegmentStatus.LOAD_FAILURE &&
partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
loadStatus = partitionStatus
}
}
} catch {
case ex: Throwable =>
loadStatus = SegmentStatus.LOAD_FAILURE
ex match {
case sparkException: SparkException =>
if (sparkException.getCause.isInstanceOf[DataLoadingException] ||
sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) {
executorMessage = sparkException.getCause.getMessage
errorMessage = errorMessage + ": " + executorMessage
}
case _ =>
if (ex.getCause != null) {
executorMessage = ex.getCause.getMessage
errorMessage = errorMessage + ": " + executorMessage
}
}
LOGGER.info(errorMessage)
LOGGER.error(ex)
}
// handle the status file updation for the update cmd.
if (updateModel.isDefined) {
if (loadStatus == SegmentStatus.LOAD_FAILURE) {
if (updateModel.get.executorErrors.failureCauses == FailureCauses.NONE) {
updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
if (null != executorMessage && !executorMessage.isEmpty) {
updateModel.get.executorErrors.errorMsg = executorMessage
} else {
updateModel.get.executorErrors.errorMsg = "Update failed as the data load has failed."
}
}
return
} else if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
updateModel.get.executorErrors.failureCauses == FailureCauses.BAD_RECORDS &&
carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
return
} else {
// in success case handle updation of the table status file.
// success case.
val segmentDetails = new util.HashSet[String]()
var resultSize = 0
res.foreach { resultOfSeg =>
resultSize = resultSize + resultOfSeg.size
resultOfSeg.foreach { resultOfBlock =>
segmentDetails.add(resultOfBlock._2._1.getLoadName)
}
}
// this means that the update doesnt have any records to update so no need to do table
// status file updation.
if (resultSize == 0) {
LOGGER.audit("Data update is successful with 0 rows updation for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
return
}
if (CarbonUpdateUtil.updateTableMetadataStatus(
segmentDetails,
carbonTable,
updateModel.get.updatedTimeStamp + "",
true,
new util.ArrayList[String](0))) {
LOGGER.audit("Data update is successful for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
} else {
val errorMessage = "Data update failed due to failure in table status updation."
LOGGER.audit("Data update is failed for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.error("Data update failed due to failure in table status updation.")
updateModel.get.executorErrors.errorMsg = errorMessage
updateModel.get.executorErrors.failureCauses = FailureCauses
.STATUS_FILE_UPDATION_FAILURE
return
}
}
return
}
if (loadStatus == SegmentStatus.LOAD_FAILURE) {
// update the load entry in table status file for changing the status to failure
CommonUtil.updateTableStatusForFailure(carbonLoadModel)
LOGGER.info("********starting clean up**********")
CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
LOGGER.info("********clean up done**********")
LOGGER.audit(s"Data load is failed for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.warn("Cannot write load metadata file as data load failed")
throw new Exception(errorMessage)
} else {
// check if data load fails due to bad record and throw data load failure due to
// bad record exception
if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
// update the load entry in table status file for changing the status to failure
CommonUtil.updateTableStatusForFailure(carbonLoadModel)
LOGGER.info("********starting clean up**********")
CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
LOGGER.info("********clean up done**********")
LOGGER.audit(s"Data load is failed for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
throw new Exception(status(0)._2._2.errorMsg)
}
// if segment is empty then fail the data load
if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap &&
!CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
// update the load entry in table status file for changing the status to failure
CommonUtil.updateTableStatusForFailure(carbonLoadModel)
LOGGER.info("********starting clean up**********")
CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
LOGGER.info("********clean up done**********")
LOGGER.audit(s"Data load is failed for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
" as there is no data to load")
LOGGER.warn("Cannot write load metadata file as data load failed")
throw new Exception("No Data to load")
}
writeDictionary(carbonLoadModel, result, writeAll = false)
// Register a handler here for executing tasks required before committing
// the load operation to a table status file
val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
LoadTablePostExecutionEvent(sqlContext.sparkSession,
carbonTable.getCarbonTableIdentifier,
carbonLoadModel)
OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
val done = updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable)
if (!done) {
CommonUtil.updateTableStatusForFailure(carbonLoadModel)
LOGGER.info("********starting clean up**********")
CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
LOGGER.info("********clean up done**********")
LOGGER.audit("Data load is failed for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.error("Data load failed due to failure in table status updation.")
throw new Exception("Data load failed due to failure in table status updation.")
}
if (SegmentStatus.LOAD_PARTIAL_SUCCESS == loadStatus) {
LOGGER.audit("Data load is partially successful for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
} else {
LOGGER.audit("Data load is successful for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
}
try {
// compaction handling
handleSegmentMerging(sqlContext, carbonLoadModel, carbonTable)
} catch {
case e: Exception =>
throw new Exception(
"Dataload is success. Auto-Compaction has failed. Please check logs.")
}
}
}
/**
* If data load is triggered by UPDATE query, this func will execute the update
* TODO: move it to a separate update command
*/
private def loadDataFrameForUpdate(
sqlContext: SQLContext,
dataFrame: Option[DataFrame],
carbonLoadModel: CarbonLoadModel,
updateModel: Option[UpdateTableModel],
carbonTable: CarbonTable
): Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = {
val segmentUpdateParallelism = CarbonProperties.getInstance().getParallelismForSegmentUpdate
val updateRdd = dataFrame.get.rdd
// return directly if no rows to update
val noRowsToUpdate = updateRdd.isEmpty()
if (noRowsToUpdate) {
Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]]()
} else {
// splitting as (key, value) i.e., (segment, updatedRows)
val keyRDD = updateRdd.map(row =>
(row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*)))
val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
carbonTable.getMetaDataFilepath)
val segmentIds = loadMetadataDetails.map(_.getLoadName)
val segmentIdIndex = segmentIds.zipWithIndex.toMap
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonLoadModel.getTablePath,
carbonTable.getCarbonTableIdentifier)
val segmentId2maxTaskNo = segmentIds.map { segId =>
(segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonTablePath))
}.toMap
class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int)
extends org.apache.spark.Partitioner {
override def numPartitions: Int = segmentIdIndex.size * parallelism
override def getPartition(key: Any): Int = {
val segId = key.asInstanceOf[String]
// partitionId
segmentIdIndex(segId) * parallelism + Random.nextInt(parallelism)
}
}
val partitionByRdd = keyRDD.partitionBy(
new SegmentPartitioner(segmentIdIndex, segmentUpdateParallelism))
// because partitionId=segmentIdIndex*parallelism+RandomPart and RandomPart<parallelism,
// so segmentIdIndex=partitionId/parallelism, this has been verified.
partitionByRdd.map(_._2).mapPartitions { partition =>
val partitionId = TaskContext.getPartitionId()
val segIdIndex = partitionId / segmentUpdateParallelism
val randomPart = partitionId - segIdIndex * segmentUpdateParallelism
val segId = segmentIds(segIdIndex)
val newTaskNo = segmentId2maxTaskNo(segId) + randomPart + 1
List(triggerDataLoadForSegment(carbonLoadModel, updateModel, segId, newTaskNo, partition)
.toList).toIterator
}.collect()
}
}
/**
* TODO: move it to a separate update command
*/
private def triggerDataLoadForSegment(
carbonLoadModel: CarbonLoadModel,
updateModel: Option[UpdateTableModel],
key: String,
taskNo: Int,
iter: Iterator[Row]
): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = {
val rddResult = new updateResultImpl()
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] {
var partitionID = "0"
val loadMetadataDetails = new LoadMetadataDetails
val executionErrors = ExecutionErrors(FailureCauses.NONE, "")
var uniqueLoadStatusId = ""
try {
val segId = key
val index = taskNo
uniqueLoadStatusId = carbonLoadModel.getTableName +
CarbonCommonConstants.UNDERSCORE +
(index + "_0")
loadMetadataDetails.setPartitionCount(partitionID)
loadMetadataDetails.setLoadName(segId)
loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_FAILURE)
carbonLoadModel.setPartitionId(partitionID)
carbonLoadModel.setSegmentId(segId)
carbonLoadModel.setTaskNo(String.valueOf(index))
carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)
loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
UpdateDataLoad.DataLoadForUpdate(segId,
index,
iter,
carbonLoadModel,
loadMetadataDetails)
} catch {
case e: NoRetryException =>
loadMetadataDetails
.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
executionErrors.failureCauses = FailureCauses.BAD_RECORDS
executionErrors.errorMsg = e.getMessage
LOGGER.info("Bad Record Found")
case e: Exception =>
LOGGER.info("DataLoad failure")
LOGGER.error(e)
throw e
}
var finished = false
override def hasNext: Boolean = !finished
override def next(): (String, (LoadMetadataDetails, ExecutionErrors)) = {
finished = true
rddResult
.getKey(uniqueLoadStatusId,
(loadMetadataDetails, executionErrors))
}
}
resultIter
}
/**
* Trigger to write dictionary files
*/
private def writeDictionary(carbonLoadModel: CarbonLoadModel,
result: Option[DictionaryServer], writeAll: Boolean): Unit = {
// write dictionary file
val uniqueTableName: String = s"${ carbonLoadModel.getDatabaseName }_${
carbonLoadModel.getTableName
}"
result match {
case Some(server) =>
try {
server.writeTableDictionary(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
.getCarbonTableIdentifier.getTableId)
} catch {
case _: Exception =>
LOGGER.error(s"Error while writing dictionary file for $uniqueTableName")
throw new Exception("Dataload failed due to error while writing dictionary file!")
}
case _ =>
}
}
/**
* Trigger compaction after data load
*/
private def handleSegmentMerging(
sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
carbonTable: CarbonTable
): Unit = {
LOGGER.info(s"compaction need status is" +
s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) {
LOGGER.audit(s"Compaction request received for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
val compactionSize = 0
val isCompactionTriggerByDDl = false
val compactionModel = CompactionModel(compactionSize,
CompactionType.MINOR_COMPACTION,
carbonTable,
isCompactionTriggerByDDl
)
var storeLocation = ""
val configuredStore = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
if (null != configuredStore && configuredStore.nonEmpty) {
storeLocation = configuredStore(Random.nextInt(configuredStore.length))
}
if (storeLocation == null) {
storeLocation = System.getProperty("java.io.tmpdir")
}
storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
val isConcurrentCompactionAllowed = CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
).equalsIgnoreCase("true")
if (!isConcurrentCompactionAllowed) {
handleCompactionForSystemLocking(sqlContext,
carbonLoadModel,
storeLocation,
CompactionType.MINOR_COMPACTION,
carbonTable,
compactionModel
)
} else {
val lock = CarbonLockFactory.getCarbonLockObj(
carbonTable.getAbsoluteTableIdentifier,
LockUsage.COMPACTION_LOCK)
if (lock.lockWithRetries()) {
LOGGER.info("Acquired the compaction lock.")
try {
startCompactionThreads(sqlContext,
carbonLoadModel,
storeLocation,
compactionModel,
lock
)
} catch {
case e: Exception =>
LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")
lock.unlock()
throw e
}
} else {
LOGGER.audit("Not able to acquire the compaction lock for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName}")
LOGGER.error("Not able to acquire the compaction lock for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName}")
}
}
}
}
/**
* Update table status file after data loading
*/
private def updateTableStatus(
status: Array[(String, (LoadMetadataDetails, ExecutionErrors))],
carbonLoadModel: CarbonLoadModel,
loadStatus: SegmentStatus,
overwriteTable: Boolean
): Boolean = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val metadataDetails = if (status != null && status(0) != null) {
status(0)._2._1
} else {
new LoadMetadataDetails
}
CarbonLoaderUtil.populateNewLoadMetaEntry(
metadataDetails,
loadStatus,
carbonLoadModel.getFactTimeStamp,
true)
CarbonUtil
.addDataIndexSizeIntoMetaEntry(metadataDetails, carbonLoadModel.getSegmentId, carbonTable)
val done = CarbonLoaderUtil.recordLoadMetadata(metadataDetails, carbonLoadModel, false,
overwriteTable)
if (!done) {
val errorMessage = "Dataload failed due to failure in table status updation."
LOGGER.audit("Data load is failed for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.error("Dataload failed due to failure in table status updation.")
throw new Exception(errorMessage)
} else if (!carbonLoadModel.isRetentionRequest) {
// TODO : Handle it
LOGGER.info("********Database updated**********")
}
done
}
/**
* repartition the input data for partition table.
*/
private def repartitionInputData(
sqlContext: SQLContext,
dataFrame: Option[DataFrame],
carbonLoadModel: CarbonLoadModel,
hadoopConf: Configuration): RDD[Row] = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName)
val partitionColumn = partitionInfo.getColumnSchemaList.get(0).getColumnName
val partitionColumnDataType = partitionInfo.getColumnSchemaList.get(0).getDataType
val columns = carbonLoadModel.getCsvHeaderColumns
var partitionColumnIndex = -1
breakable {
for (i <- 0 until columns.length) {
if (partitionColumn.equalsIgnoreCase(columns(i))) {
partitionColumnIndex = i
break
}
}
}
if (partitionColumnIndex == -1) {
throw new DataLoadingException("Partition column not found.")
}
val specificTimestampFormat = carbonLoadModel.getTimestampformat
val specificDateFormat = carbonLoadModel.getDateFormat
val timeStampFormat =
if (specificTimestampFormat != null && !specificTimestampFormat.trim.isEmpty) {
new SimpleDateFormat(specificTimestampFormat)
} else {
val timestampFormatString = CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
new SimpleDateFormat(timestampFormatString)
}
val dateFormat = if (specificDateFormat != null && !specificDateFormat.trim.isEmpty) {
new SimpleDateFormat(specificDateFormat)
} else {
val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
new SimpleDateFormat(dateFormatString)
}
// generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions
val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) {
// input data from DataFrame
val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
dataFrame.get.rdd.map { row =>
if (null != row && row.length > partitionColumnIndex &&
null != row.get(partitionColumnIndex)) {
(CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat,
delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row)
} else {
(null, row)
}
}
} else {
// input data from csv files
CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
val columnCount = columns.length
val jobConf = new JobConf(hadoopConf)
SparkHadoopUtil.get.addCredentials(jobConf)
new NewHadoopRDD[NullWritable, StringArrayWritable](
sqlContext.sparkContext,
classOf[CSVInputFormat],
classOf[NullWritable],
classOf[StringArrayWritable],
jobConf
).map { currentRow =>
if (null == currentRow || null == currentRow._2) {
val row = new StringArrayRow(new Array[String](columnCount))
(null, row)
} else {
val row = new StringArrayRow(new Array[String](columnCount))
val values = currentRow._2.get()
if (values != null && values.length > partitionColumnIndex) {
(currentRow._2.get()(partitionColumnIndex), row.setValues(currentRow._2.get()))
} else {
(null, row.setValues(currentRow._2.get()))
}
}
}
}
val partitioner = PartitionFactory.getPartitioner(partitionInfo)
if (partitionColumnDataType == DataTypes.STRING) {
if (partitionInfo.getPartitionType == PartitionType.RANGE) {
inputRDD.map { row => (ByteUtil.toBytes(row._1), row._2) }
.partitionBy(partitioner)
.map(_._2)
} else {
inputRDD.partitionBy(partitioner)
.map(_._2)
}
} else {
inputRDD.map { row =>
(PartitionUtil.getDataBasedOnDataType(row._1, partitionColumnDataType, timeStampFormat,
dateFormat), row._2)
}
.partitionBy(partitioner)
.map(_._2)
}
}
/**
* Execute load process for partition table
*/
private def loadDataForPartitionTable(
sqlContext: SQLContext,
dataFrame: Option[DataFrame],
carbonLoadModel: CarbonLoadModel,
hadoopConf: Configuration
): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
try {
val rdd = repartitionInputData(sqlContext, dataFrame, carbonLoadModel, hadoopConf)
new PartitionTableDataLoaderRDD(
sqlContext.sparkContext,
new DataLoadResultImpl(),
carbonLoadModel,
rdd
).collect()
} catch {
case ex: Exception =>
LOGGER.error(ex, "load data failed for partition table")
throw ex
}
}
/**
* Execute load process to load from input dataframe
*/
private def loadDataFrame(
sqlContext: SQLContext,
dataFrame: Option[DataFrame],
carbonLoadModel: CarbonLoadModel
): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
try {
val rdd = dataFrame.get.rdd
val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
}.distinct.length
val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
nodeNumOfData,
sqlContext.sparkContext)
val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
new NewDataFrameLoaderRDD(
sqlContext.sparkContext,
new DataLoadResultImpl(),
carbonLoadModel,
newRdd
).collect()
} catch {
case ex: Exception =>
LOGGER.error(ex, "load data frame failed")
throw ex
}
}
/**
* Execute load process to load from input file path specified in `carbonLoadModel`
*/
private def loadDataFile(
sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
hadoopConf: Configuration
): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
/*
* when data load handle by node partition
* 1)clone the hadoop configuration,and set the file path to the configuration
* 2)use org.apache.hadoop.mapreduce.lib.input.TextInputFormat to get splits,size info
* 3)use CarbonLoaderUtil.nodeBlockMapping to get mapping info of node and block,
* for locally writing carbondata files(one file one block) in nodes
* 4)use NewCarbonDataLoadRDD to load data and write to carbondata files
*/
// FileUtils will skip file which is no csv, and return all file path which split by ','
val filePaths = carbonLoadModel.getFactFilePath
hadoopConf.set(FileInputFormat.INPUT_DIR, filePaths)
hadoopConf.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true")
hadoopConf.set("io.compression.codecs",
"""org.apache.hadoop.io.compress.GzipCodec,
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
CommonUtil.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConf)
val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat
val jobContext = new Job(hadoopConf)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val blockList = rawSplits.map { inputSplit =>
val fileSplit = inputSplit.asInstanceOf[FileSplit]
new TableBlockInfo(fileSplit.getPath.toString,
fileSplit.getStart, "1",
fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1, null
).asInstanceOf[Distributable]
}
// group blocks to nodes, tasks
val startTime = System.currentTimeMillis
val activeNodes = DistributionUtil
.ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
val nodeBlockMapping =
CarbonLoaderUtil
.nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
.toSeq
val timeElapsed: Long = System.currentTimeMillis - startTime
LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +
s"No.of Nodes: ${nodeBlockMapping.size}")
var str = ""
nodeBlockMapping.foreach { entry =>
val tableBlock = entry._2
str = str + "#Node: " + entry._1 + " no.of.blocks: " + tableBlock.size()
tableBlock.asScala.foreach(tableBlockInfo =>
if (!tableBlockInfo.getLocations.exists(hostentry =>
hostentry.equalsIgnoreCase(entry._1)
)) {
str = str + " , mismatch locations: " + tableBlockInfo.getLocations
.foldLeft("")((a, b) => a + "," + b)
}
)
str = str + "\n"
}
LOGGER.info(str)
val blocksGroupBy: Array[(String, Array[BlockDetails])] = nodeBlockMapping.map { entry =>
val blockDetailsList =
entry._2.asScala.map(distributable => {
val tableBlock = distributable.asInstanceOf[TableBlockInfo]
new BlockDetails(new Path(tableBlock.getFilePath),
tableBlock.getBlockOffset, tableBlock.getBlockLength, tableBlock.getLocations
)
}).toArray
(entry._1, blockDetailsList)
}.toArray
new NewCarbonDataLoadRDD(
sqlContext.sparkContext,
new DataLoadResultImpl(),
carbonLoadModel,
blocksGroupBy,
hadoopConf
).collect()
}
}