| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.spark.rdd |
| |
| import java.io.File |
| import java.util |
| import java.util.concurrent._ |
| |
| import scala.collection.JavaConverters._ |
| import scala.collection.mutable.ListBuffer |
| import scala.util.Random |
| |
| import org.apache.commons.lang3.StringUtils |
| import org.apache.hadoop.conf.Configuration |
| import org.apache.hadoop.fs.Path |
| import org.apache.hadoop.mapred.JobConf |
| import org.apache.hadoop.mapreduce.Job |
| import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} |
| import org.apache.spark.{SparkEnv, TaskContext} |
| import org.apache.spark.deploy.SparkHadoopUtil |
| import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer} |
| import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext} |
| import org.apache.spark.sql.catalyst.TableIdentifier |
| import org.apache.spark.sql.execution.command.{CompactionModel, ExecutionErrors, UpdateTableModel} |
| import org.apache.spark.sql.hive.DistributionUtil |
| import org.apache.spark.sql.optimizer.CarbonFilters |
| import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil} |
| |
| import org.apache.carbondata.common.constants.LoggerAction |
| import org.apache.carbondata.common.logging.LogServiceFactory |
| import org.apache.carbondata.core.constants.{CarbonCommonConstants, SortScopeOptions} |
| import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} |
| import org.apache.carbondata.core.datamap.status.DataMapStatusManager |
| import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo} |
| import org.apache.carbondata.core.datastore.compression.CompressorFactory |
| import org.apache.carbondata.core.datastore.filesystem.CarbonFile |
| import org.apache.carbondata.core.datastore.impl.FileFactory |
| import org.apache.carbondata.core.dictionary.server.DictionaryServer |
| import org.apache.carbondata.core.exception.ConcurrentOperationException |
| import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} |
| import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion, SegmentFileStore} |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable |
| import org.apache.carbondata.core.mutate.CarbonUpdateUtil |
| import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} |
| import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, ThreadLocalSessionInfo} |
| import org.apache.carbondata.core.util.path.CarbonTablePath |
| import org.apache.carbondata.events.{OperationContext, OperationListenerBus} |
| import org.apache.carbondata.indexserver.{DistributedRDDUtils, IndexServer} |
| import org.apache.carbondata.processing.loading.FailureCauses |
| import org.apache.carbondata.processing.loading.csvinput.BlockDetails |
| import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent} |
| import org.apache.carbondata.processing.loading.exception.NoRetryException |
| import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} |
| import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType} |
| import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} |
| import org.apache.carbondata.spark.{DataLoadResultImpl, _} |
| import org.apache.carbondata.spark.load._ |
| import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util} |
| |
| /** |
| * This is the factory class which can create different RDD depends on user needs. |
| * |
| */ |
| object CarbonDataRDDFactory { |
| |
| private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) |
| |
| def handleCompactionForSystemLocking(sqlContext: SQLContext, |
| carbonLoadModel: CarbonLoadModel, |
| storeLocation: String, |
| compactionType: CompactionType, |
| carbonTable: CarbonTable, |
| compactedSegments: java.util.List[String], |
| compactionModel: CompactionModel, |
| operationContext: OperationContext): Unit = { |
| // taking system level lock at the mdt file location |
| var configuredMdtPath = CarbonProperties.getInstance().getProperty( |
| CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER, |
| CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT).trim |
| |
| configuredMdtPath = CarbonUtil.checkAndAppendFileSystemURIScheme(configuredMdtPath) |
| val lock = CarbonLockFactory.getSystemLevelCarbonLockObj( |
| configuredMdtPath + CarbonCommonConstants.FILE_SEPARATOR + |
| CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER, |
| LockUsage.SYSTEMLEVEL_COMPACTION_LOCK) |
| |
| if (lock.lockWithRetries()) { |
| LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" + |
| s".${ carbonLoadModel.getTableName }") |
| try { |
| startCompactionThreads( |
| sqlContext, |
| carbonLoadModel, |
| storeLocation, |
| compactionModel, |
| lock, |
| compactedSegments, |
| operationContext |
| ) |
| } catch { |
| case e: Exception => |
| LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }") |
| lock.unlock() |
| // if the compaction is a blocking call then only need to throw the exception. |
| if (compactionModel.isDDLTrigger) { |
| throw e |
| } |
| } |
| } else { |
| LOGGER.error("Not able to acquire the compaction lock for table " + |
| s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") |
| CarbonCompactionUtil |
| .createCompactionRequiredFile(carbonTable.getMetadataPath, compactionType) |
| // throw exception only in case of DDL trigger. |
| if (compactionModel.isDDLTrigger) { |
| CarbonException.analysisException( |
| s"Compaction is in progress, compaction request for table " + |
| s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}" + |
| " is in queue.") |
| } else { |
| LOGGER.error("Compaction is in progress, compaction request for table " + |
| s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}" + |
| " is in queue.") |
| } |
| } |
| } |
| |
| def startCompactionThreads(sqlContext: SQLContext, |
| carbonLoadModel: CarbonLoadModel, |
| storeLocation: String, |
| compactionModel: CompactionModel, |
| compactionLock: ICarbonLock, |
| compactedSegments: java.util.List[String], |
| operationContext: OperationContext): Unit = { |
| val executor: ExecutorService = Executors.newFixedThreadPool(1) |
| // update the updated table status. |
| if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA) { |
| // update the updated table status. For the case of Update Delta Compaction the Metadata |
| // is filled in LoadModel, no need to refresh. |
| carbonLoadModel.readAndSetLoadMetadataDetails() |
| } |
| |
| val compactionThread = new Thread { |
| override def run(): Unit = { |
| val compactor = CompactionFactory.getCompactor( |
| carbonLoadModel, |
| compactionModel, |
| executor, |
| sqlContext, |
| storeLocation, |
| compactedSegments, |
| operationContext) |
| try { |
| // compaction status of the table which is triggered by the user. |
| var triggeredCompactionStatus = false |
| var exception: Exception = null |
| try { |
| compactor.executeCompaction() |
| triggeredCompactionStatus = true |
| } catch { |
| case e: Exception => |
| LOGGER.error(s"Exception in compaction thread ${ e.getMessage }") |
| exception = e |
| } |
| // continue in case of exception also, check for all the tables. |
| val isConcurrentCompactionAllowed = CarbonProperties.getInstance().getProperty( |
| CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, |
| CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION |
| ).equalsIgnoreCase("true") |
| |
| if (!isConcurrentCompactionAllowed) { |
| LOGGER.info("System level compaction lock is enabled.") |
| val skipCompactionTables = ListBuffer[CarbonTableIdentifier]() |
| var tableForCompaction = CarbonCompactionUtil.getNextTableToCompact( |
| CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore |
| .listAllTables(sqlContext.sparkSession).toArray, |
| skipCompactionTables.toList.asJava) |
| while (null != tableForCompaction) { |
| LOGGER.info("Compaction request has been identified for table " + |
| s"${ tableForCompaction.getDatabaseName }." + |
| s"${ tableForCompaction.getTableName}") |
| val table: CarbonTable = tableForCompaction |
| val metadataPath = table.getMetadataPath |
| val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath) |
| |
| val newCarbonLoadModel = prepareCarbonLoadModel(table) |
| |
| val compactionSize = CarbonDataMergerUtil |
| .getCompactionSize(CompactionType.MAJOR, carbonLoadModel) |
| |
| val newcompactionModel = CompactionModel( |
| compactionSize, |
| compactionType, |
| table, |
| compactionModel.isDDLTrigger, |
| CarbonFilters.getCurrentPartitions(sqlContext.sparkSession, |
| TableIdentifier(table.getTableName, |
| Some(table.getDatabaseName))), None) |
| // proceed for compaction |
| try { |
| CompactionFactory.getCompactor( |
| newCarbonLoadModel, |
| newcompactionModel, |
| executor, |
| sqlContext, |
| storeLocation, |
| compactedSegments, |
| operationContext).executeCompaction() |
| } catch { |
| case e: Exception => |
| LOGGER.error("Exception in compaction thread for table " + |
| s"${ tableForCompaction.getDatabaseName }." + |
| s"${ tableForCompaction.getTableName }") |
| // not handling the exception. only logging as this is not the table triggered |
| // by user. |
| } finally { |
| // delete the compaction required file in case of failure or success also. |
| if (!CarbonCompactionUtil |
| .deleteCompactionRequiredFile(metadataPath, compactionType)) { |
| // if the compaction request file is not been able to delete then |
| // add those tables details to the skip list so that it wont be considered next. |
| skipCompactionTables.+=:(tableForCompaction.getCarbonTableIdentifier) |
| LOGGER.error("Compaction request file can not be deleted for table " + |
| s"${ tableForCompaction.getDatabaseName }." + |
| s"${ tableForCompaction.getTableName }") |
| } |
| } |
| // ********* check again for all the tables. |
| tableForCompaction = CarbonCompactionUtil.getNextTableToCompact( |
| CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore |
| .listAllTables(sqlContext.sparkSession).toArray, |
| skipCompactionTables.asJava) |
| } |
| } |
| // Remove compacted segments from executor cache. |
| if (CarbonProperties.getInstance().isDistributedPruningEnabled( |
| carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)) { |
| try { |
| IndexServer.getClient.invalidateSegmentCache(carbonLoadModel |
| .getCarbonDataLoadSchema.getCarbonTable, |
| compactedSegments.asScala.toArray, |
| SparkSQLUtil.getTaskGroupId(sqlContext.sparkSession)) |
| } catch { |
| case ex: Exception => |
| LOGGER.warn(s"Clear cache job has failed for ${carbonLoadModel |
| .getDatabaseName}.${carbonLoadModel.getTableName}", ex) |
| } |
| } |
| // giving the user his error for telling in the beeline if his triggered table |
| // compaction is failed. |
| if (!triggeredCompactionStatus) { |
| throw new Exception("Exception in compaction " + exception.getMessage) |
| } |
| } finally { |
| executor.shutdownNow() |
| compactor.deletePartialLoadsInCompaction() |
| if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA) { |
| compactionLock.unlock() |
| } |
| } |
| } |
| } |
| // calling the run method of a thread to make the call as blocking call. |
| // in the future we may make this as concurrent. |
| compactionThread.run() |
| } |
| |
| private def prepareCarbonLoadModel( |
| table: CarbonTable |
| ): CarbonLoadModel = { |
| val loadModel = new CarbonLoadModel |
| loadModel.setTableName(table.getTableName) |
| val dataLoadSchema = new CarbonDataLoadSchema(table) |
| // Need to fill dimension relation |
| loadModel.setCarbonDataLoadSchema(dataLoadSchema) |
| loadModel.setTableName(table.getCarbonTableIdentifier.getTableName) |
| loadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName) |
| loadModel.setTablePath(table.getTablePath) |
| loadModel.setCarbonTransactionalTable(table.isTransactionalTable) |
| loadModel.readAndSetLoadMetadataDetails() |
| val loadStartTime = CarbonUpdateUtil.readCurrentTime() |
| loadModel.setFactTimeStamp(loadStartTime) |
| val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala |
| .getOrElse(CarbonCommonConstants.COMPRESSOR, |
| CompressorFactory.getInstance().getCompressor.getName) |
| loadModel.setColumnCompressor(columnCompressor) |
| loadModel |
| } |
| |
| def loadCarbonData( |
| sqlContext: SQLContext, |
| carbonLoadModel: CarbonLoadModel, |
| columnar: Boolean, |
| partitionStatus: SegmentStatus = SegmentStatus.SUCCESS, |
| result: Option[DictionaryServer], |
| overwriteTable: Boolean, |
| hadoopConf: Configuration, |
| dataFrame: Option[DataFrame] = None, |
| updateModel: Option[UpdateTableModel] = None, |
| operationContext: OperationContext): LoadMetadataDetails = { |
| // Check if any load need to be deleted before loading new data |
| val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable |
| var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null |
| var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null |
| |
| // create new segment folder in carbon store |
| if (updateModel.isEmpty && carbonLoadModel.isCarbonTransactionalTable) { |
| CarbonLoaderUtil.checkAndCreateCarbonDataLocation(carbonLoadModel.getSegmentId, carbonTable) |
| } |
| var loadStatus = SegmentStatus.SUCCESS |
| var errorMessage: String = "DataLoad failure" |
| var executorMessage: String = "" |
| val isSortTable = carbonTable.getNumberOfSortColumns > 0 |
| val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope) |
| |
| val segmentLock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, |
| CarbonTablePath.addSegmentPrefix(carbonLoadModel.getSegmentId) + LockUsage.LOCK) |
| |
| try { |
| if (!carbonLoadModel.isCarbonTransactionalTable || segmentLock.lockWithRetries()) { |
| if (updateModel.isDefined) { |
| res = loadDataFrameForUpdate( |
| sqlContext, |
| dataFrame, |
| carbonLoadModel, |
| updateModel, |
| carbonTable, |
| hadoopConf) |
| res.foreach { resultOfSeg => |
| resultOfSeg.foreach { resultOfBlock => |
| if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) { |
| loadStatus = SegmentStatus.LOAD_FAILURE |
| if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) { |
| updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE |
| updateModel.get.executorErrors.errorMsg = "Failure in the Executor." |
| } else { |
| updateModel.get.executorErrors = resultOfBlock._2._2 |
| } |
| } else if (resultOfBlock._2._1.getSegmentStatus == |
| SegmentStatus.LOAD_PARTIAL_SUCCESS) { |
| loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS |
| updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses |
| updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg |
| } |
| } |
| } |
| } else { |
| status = if (dataFrame.isEmpty && isSortTable && |
| carbonLoadModel.getRangePartitionColumn != null && |
| (sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT) || |
| sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT))) { |
| DataLoadProcessBuilderOnSpark |
| .loadDataUsingRangeSort(sqlContext.sparkSession, carbonLoadModel, hadoopConf) |
| } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { |
| DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession, |
| dataFrame, carbonLoadModel, hadoopConf) |
| } else if (dataFrame.isDefined) { |
| loadDataFrame(sqlContext, dataFrame, carbonLoadModel) |
| } else { |
| loadDataFile(sqlContext, carbonLoadModel, hadoopConf) |
| } |
| val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus] |
| if (status.nonEmpty) { |
| status.foreach { eachLoadStatus => |
| val state = newStatusMap.get(eachLoadStatus._1) |
| state match { |
| case Some(SegmentStatus.LOAD_FAILURE) => |
| newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) |
| case Some(SegmentStatus.LOAD_PARTIAL_SUCCESS) |
| if eachLoadStatus._2._1.getSegmentStatus == |
| SegmentStatus.SUCCESS => |
| newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) |
| case _ => |
| newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getSegmentStatus) |
| } |
| } |
| |
| newStatusMap.foreach { |
| case (key, value) => |
| if (value == SegmentStatus.LOAD_FAILURE) { |
| loadStatus = SegmentStatus.LOAD_FAILURE |
| } else if (value == SegmentStatus.LOAD_PARTIAL_SUCCESS && |
| loadStatus != SegmentStatus.LOAD_FAILURE) { |
| loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS |
| } |
| } |
| } else { |
| // if no value is there in data load, make load status Success |
| // and data load flow executes |
| if (dataFrame.isDefined && updateModel.isEmpty) { |
| val rdd = dataFrame.get.rdd |
| if (rdd.partitions == null || rdd.partitions.length == 0) { |
| LOGGER.warn("DataLoading finished. No data was loaded.") |
| loadStatus = SegmentStatus.SUCCESS |
| } |
| } else { |
| loadStatus = SegmentStatus.LOAD_FAILURE |
| } |
| } |
| |
| if (loadStatus != SegmentStatus.LOAD_FAILURE && |
| partitionStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) { |
| loadStatus = partitionStatus |
| } |
| } |
| } |
| } catch { |
| case ex: Throwable => |
| loadStatus = SegmentStatus.LOAD_FAILURE |
| val (extrMsgLocal, errorMsgLocal) = CarbonScalaUtil.retrieveAndLogErrorMsg(ex, LOGGER) |
| executorMessage = extrMsgLocal |
| errorMessage = errorMsgLocal |
| LOGGER.info(errorMessage) |
| LOGGER.error(ex) |
| } finally { |
| segmentLock.unlock() |
| } |
| // handle the status file updation for the update cmd. |
| if (updateModel.isDefined) { |
| if (loadStatus == SegmentStatus.LOAD_FAILURE) { |
| CarbonScalaUtil.updateErrorInUpdateModel(updateModel.get, executorMessage) |
| return null |
| } else if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS && |
| updateModel.get.executorErrors.failureCauses == FailureCauses.BAD_RECORDS && |
| carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) { |
| return null |
| } else { |
| // in success case handle updation of the table status file. |
| // success case. |
| // write the dictionary file in case of single_pass true |
| writeDictionary(carbonLoadModel, result, false) |
| val segmentDetails = new util.HashSet[Segment]() |
| var resultSize = 0 |
| res.foreach { resultOfSeg => |
| resultSize = resultSize + resultOfSeg.size |
| resultOfSeg.foreach { resultOfBlock => |
| segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName)) |
| } |
| } |
| val segmentFiles = updateSegmentFiles(carbonTable, segmentDetails, updateModel.get) |
| |
| // this means that the update doesnt have any records to update so no need to do table |
| // status file updation. |
| if (resultSize == 0) { |
| return null |
| } |
| if (CarbonUpdateUtil.updateTableMetadataStatus( |
| segmentDetails, |
| carbonTable, |
| updateModel.get.updatedTimeStamp + "", |
| true, |
| new util.ArrayList[Segment](0), |
| new util.ArrayList[Segment](segmentFiles), "")) { |
| } else { |
| LOGGER.error("Data update failed due to failure in table status updation.") |
| updateModel.get.executorErrors.errorMsg = errorMessage |
| updateModel.get.executorErrors.failureCauses = FailureCauses |
| .STATUS_FILE_UPDATION_FAILURE |
| return null |
| } |
| } |
| return null |
| } |
| val uniqueTableStatusId = Option(operationContext.getProperty("uuid")).getOrElse("") |
| .asInstanceOf[String] |
| if (loadStatus == SegmentStatus.LOAD_FAILURE) { |
| // update the load entry in table status file for changing the status to marked for delete |
| CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId) |
| LOGGER.info("********starting clean up**********") |
| if (carbonLoadModel.isCarbonTransactionalTable) { |
| // delete segment is applicable for transactional table |
| CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) |
| clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId) |
| } |
| LOGGER.info("********clean up done**********") |
| LOGGER.warn("Cannot write load metadata file as data load failed") |
| throw new Exception(errorMessage) |
| } else { |
| // check if data load fails due to bad record and throw data load failure due to |
| // bad record exception |
| if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS && |
| status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS && |
| carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) { |
| // update the load entry in table status file for changing the status to marked for delete |
| CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId) |
| LOGGER.info("********starting clean up**********") |
| if (carbonLoadModel.isCarbonTransactionalTable) { |
| // delete segment is applicable for transactional table |
| CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) |
| clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId) |
| } |
| LOGGER.info("********clean up done**********") |
| throw new Exception(status(0)._2._2.errorMsg) |
| } |
| // as no record loaded in new segment, new segment should be deleted |
| val newEntryLoadStatus = |
| if (carbonLoadModel.isCarbonTransactionalTable && |
| !carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildTableForMV && |
| !CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) { |
| LOGGER.warn("Cannot write load metadata file as there is no data to load") |
| SegmentStatus.MARKED_FOR_DELETE |
| } else { |
| loadStatus |
| } |
| |
| writeDictionary(carbonLoadModel, result, writeAll = false) |
| |
| val segmentFileName = |
| SegmentFileStore.writeSegmentFile(carbonTable, carbonLoadModel.getSegmentId, |
| String.valueOf(carbonLoadModel.getFactTimeStamp)) |
| |
| SegmentFileStore.updateTableStatusFile( |
| carbonTable, |
| carbonLoadModel.getSegmentId, |
| segmentFileName, |
| carbonTable.getCarbonTableIdentifier.getTableId, |
| new SegmentFileStore(carbonTable.getTablePath, segmentFileName)) |
| |
| operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment", |
| carbonLoadModel.getSegmentId) |
| val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent = |
| new LoadTablePreStatusUpdateEvent( |
| carbonTable.getCarbonTableIdentifier, |
| carbonLoadModel) |
| OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext) |
| val (done, writtenSegment) = |
| updateTableStatus( |
| status, |
| carbonLoadModel, |
| newEntryLoadStatus, |
| overwriteTable, |
| segmentFileName, |
| uniqueTableStatusId) |
| val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent = |
| new LoadTablePostStatusUpdateEvent(carbonLoadModel) |
| val commitComplete = try { |
| OperationListenerBus.getInstance() |
| .fireEvent(loadTablePostStatusUpdateEvent, operationContext) |
| true |
| } catch { |
| case ex: Exception => |
| LOGGER.error("Problem while committing data maps", ex) |
| false |
| } |
| if (!done || !commitComplete) { |
| CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId) |
| LOGGER.info("********starting clean up**********") |
| if (carbonLoadModel.isCarbonTransactionalTable) { |
| // delete segment is applicable for transactional table |
| CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) |
| // delete corresponding segment file from metadata |
| val segmentFile = CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) + |
| File.separator + segmentFileName |
| FileFactory.deleteFile(segmentFile) |
| clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId) |
| } |
| LOGGER.info("********clean up done**********") |
| LOGGER.error("Data load failed due to failure in table status updation.") |
| throw new Exception("Data load failed due to failure in table status updation.") |
| } |
| if (SegmentStatus.LOAD_PARTIAL_SUCCESS == loadStatus) { |
| LOGGER.info("Data load is partially successful for " + |
| s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") |
| } else { |
| LOGGER.info("Data load is successful for " + |
| s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") |
| } |
| |
| // code to handle Pre-Priming cache for loading |
| |
| if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) { |
| DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(), |
| operationContext, hadoopConf, List(carbonLoadModel.getSegmentId)) |
| } |
| try { |
| // compaction handling |
| if (carbonTable.isHivePartitionTable) { |
| carbonLoadModel.setFactTimeStamp(System.currentTimeMillis()) |
| } |
| val compactedSegments = new util.ArrayList[String]() |
| handleSegmentMerging(sqlContext, |
| carbonLoadModel |
| .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter), |
| carbonTable, |
| compactedSegments, |
| operationContext) |
| carbonLoadModel.setMergedSegmentIds(compactedSegments) |
| writtenSegment |
| } catch { |
| case e: Exception => |
| LOGGER.error( |
| "Auto-Compaction has failed. Ignoring this exception because the" + |
| " load is passed.", e) |
| writtenSegment |
| } |
| } |
| } |
| |
| /** |
| * clear datamap files for segment |
| */ |
| def clearDataMapFiles(carbonTable: CarbonTable, segmentId: String): Unit = { |
| try { |
| val segments = List(new Segment(segmentId)).asJava |
| DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala |
| .filter(_.getDataMapSchema.isIndexDataMap) |
| .foreach(_.deleteDatamapData(segments)) |
| } catch { |
| case ex : Exception => |
| LOGGER.error(s"Failed to clear datamap files for" + |
| s" ${carbonTable.getDatabaseName}.${carbonTable.getTableName}") |
| } |
| } |
| /** |
| * Add and update the segment files. In case of update scenario the carbonindex files are written |
| * to the same segment so we need to update old segment file. So this ethod writes the latest data |
| * to new segment file and merges this file old file to get latest updated files. |
| * @param carbonTable |
| * @param segmentDetails |
| * @return |
| */ |
| private def updateSegmentFiles( |
| carbonTable: CarbonTable, |
| segmentDetails: util.HashSet[Segment], |
| updateModel: UpdateTableModel) = { |
| val metadataDetails = |
| SegmentStatusManager.readTableStatusFile( |
| CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)) |
| val segmentFiles = segmentDetails.asScala.map { seg => |
| val load = |
| metadataDetails.find(_.getLoadName.equals(seg.getSegmentNo)).get |
| val segmentFile = load.getSegmentFile |
| var segmentFiles: Seq[CarbonFile] = Seq.empty[CarbonFile] |
| |
| val file = SegmentFileStore.writeSegmentFile( |
| carbonTable, |
| seg.getSegmentNo, |
| String.valueOf(System.currentTimeMillis()), |
| load.getPath) |
| |
| if (segmentFile != null) { |
| segmentFiles ++= FileFactory.getCarbonFile( |
| SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, segmentFile)) :: Nil |
| } |
| val updatedSegFile = if (file != null) { |
| val carbonFile = FileFactory.getCarbonFile( |
| SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, file)) |
| segmentFiles ++= carbonFile :: Nil |
| |
| val mergedSegFileName = SegmentFileStore.genSegmentFileName( |
| seg.getSegmentNo, |
| updateModel.updatedTimeStamp.toString) |
| SegmentFileStore.mergeSegmentFiles( |
| mergedSegFileName, |
| CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath), |
| segmentFiles.toArray) |
| carbonFile.delete() |
| mergedSegFileName + CarbonTablePath.SEGMENT_EXT |
| } else { |
| null |
| } |
| |
| new Segment(seg.getSegmentNo, updatedSegFile) |
| }.filter(_.getSegmentFileName != null).asJava |
| segmentFiles |
| } |
| |
| /** |
| * If data load is triggered by UPDATE query, this func will execute the update |
| * TODO: move it to a separate update command |
| */ |
| private def loadDataFrameForUpdate( |
| sqlContext: SQLContext, |
| dataFrame: Option[DataFrame], |
| carbonLoadModel: CarbonLoadModel, |
| updateModel: Option[UpdateTableModel], |
| carbonTable: CarbonTable, |
| hadoopConf: Configuration): Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = { |
| val segmentUpdateParallelism = CarbonProperties.getInstance().getParallelismForSegmentUpdate |
| |
| val updateRdd = dataFrame.get.rdd |
| |
| // return directly if no rows to update |
| val noRowsToUpdate = updateRdd.isEmpty() |
| if (noRowsToUpdate) { |
| Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]]() |
| } else { |
| // splitting as (key, value) i.e., (segment, updatedRows) |
| val keyRDD = updateRdd.map(row => |
| (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*))) |
| |
| val loadMetadataDetails = SegmentStatusManager.readLoadMetadata( |
| carbonTable.getMetadataPath) |
| .filter(lmd => lmd.getSegmentStatus.equals(SegmentStatus.LOAD_PARTIAL_SUCCESS) || |
| lmd.getSegmentStatus.equals(SegmentStatus.SUCCESS)) |
| val segments = loadMetadataDetails.map(f => new Segment(f.getLoadName, f.getSegmentFile)) |
| val segmentIdIndex = segments.map(_.getSegmentNo).zipWithIndex.toMap |
| val segmentId2maxTaskNo = segments.map { seg => |
| (seg.getSegmentNo, |
| CarbonUpdateUtil.getLatestTaskIdForSegment(seg, carbonLoadModel.getTablePath)) |
| }.toMap |
| |
| class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int) |
| extends org.apache.spark.Partitioner { |
| override def numPartitions: Int = segmentIdIndex.size * parallelism |
| |
| override def getPartition(key: Any): Int = { |
| val segId = key.asInstanceOf[String] |
| segmentIdIndex(segId) * parallelism + Random.nextInt(parallelism) |
| } |
| } |
| |
| val partitionByRdd = keyRDD.partitionBy( |
| new SegmentPartitioner(segmentIdIndex, segmentUpdateParallelism)) |
| |
| // because partitionId=segmentIdIndex*parallelism+RandomPart and RandomPart<parallelism, |
| // so segmentIdIndex=partitionId/parallelism, this has been verified. |
| val conf = SparkSQLUtil.broadCastHadoopConf(sqlContext.sparkSession.sparkContext, hadoopConf) |
| partitionByRdd.map(_._2).mapPartitions { partition => |
| ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) |
| val partitionId = TaskContext.getPartitionId() |
| val segIdIndex = partitionId / segmentUpdateParallelism |
| val randomPart = partitionId - segIdIndex * segmentUpdateParallelism |
| val segId = segments(segIdIndex) |
| val newTaskNo = segmentId2maxTaskNo(segId.getSegmentNo) + randomPart + 1 |
| List(triggerDataLoadForSegment( |
| carbonLoadModel, |
| updateModel, |
| segId.getSegmentNo, |
| newTaskNo, |
| partition).toList).toIterator |
| }.collect() |
| } |
| } |
| |
| /** |
| * TODO: move it to a separate update command |
| */ |
| private def triggerDataLoadForSegment( |
| carbonLoadModel: CarbonLoadModel, |
| updateModel: Option[UpdateTableModel], |
| key: String, |
| taskNo: Long, |
| iter: Iterator[Row]): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = { |
| val rddResult = new updateResultImpl() |
| val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) |
| val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] { |
| val loadMetadataDetails = new LoadMetadataDetails |
| val executionErrors = ExecutionErrors(FailureCauses.NONE, "") |
| var uniqueLoadStatusId = "" |
| try { |
| val segId = key |
| val index = taskNo |
| uniqueLoadStatusId = carbonLoadModel.getTableName + |
| CarbonCommonConstants.UNDERSCORE + |
| (index + "_0") |
| |
| loadMetadataDetails.setLoadName(segId) |
| loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_FAILURE) |
| carbonLoadModel.setSegmentId(segId) |
| carbonLoadModel.setTaskNo(String.valueOf(index)) |
| carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp) |
| |
| loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) |
| UpdateDataLoad.DataLoadForUpdate(segId, |
| index, |
| iter, |
| carbonLoadModel, |
| loadMetadataDetails) |
| } catch { |
| case e: NoRetryException => |
| loadMetadataDetails |
| .setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS) |
| executionErrors.failureCauses = FailureCauses.BAD_RECORDS |
| executionErrors.errorMsg = e.getMessage |
| LOGGER.info("Bad Record Found") |
| case e: Exception => |
| LOGGER.info("DataLoad failure") |
| LOGGER.error(e) |
| throw e |
| } |
| |
| var finished = false |
| |
| override def hasNext: Boolean = !finished |
| |
| override def next(): (String, (LoadMetadataDetails, ExecutionErrors)) = { |
| finished = true |
| rddResult |
| .getKey(uniqueLoadStatusId, |
| (loadMetadataDetails, executionErrors)) |
| } |
| } |
| resultIter |
| } |
| |
| /** |
| * Trigger to write dictionary files |
| */ |
| private def writeDictionary(carbonLoadModel: CarbonLoadModel, |
| result: Option[DictionaryServer], writeAll: Boolean): Unit = { |
| // write dictionary file |
| val uniqueTableName: String = |
| CarbonTable.buildUniqueName(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName) |
| result match { |
| case Some(server) => |
| try { |
| server.writeTableDictionary(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable |
| .getCarbonTableIdentifier.getTableId) |
| } catch { |
| case _: Exception => |
| LOGGER.error(s"Error while writing dictionary file for $uniqueTableName") |
| throw new Exception("Dataload failed due to error while writing dictionary file!") |
| } |
| case _ => |
| } |
| } |
| |
| /** |
| * Trigger compaction after data load |
| */ |
| def handleSegmentMerging( |
| sqlContext: SQLContext, |
| carbonLoadModel: CarbonLoadModel, |
| carbonTable: CarbonTable, |
| compactedSegments: java.util.List[String], |
| operationContext: OperationContext): Unit = { |
| LOGGER.info(s"compaction need status is" + |
| s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable) }") |
| if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable)) { |
| val compactionSize = 0 |
| val isCompactionTriggerByDDl = false |
| val compactionModel = CompactionModel( |
| compactionSize, |
| CompactionType.MINOR, |
| carbonTable, |
| isCompactionTriggerByDDl, |
| CarbonFilters.getCurrentPartitions(sqlContext.sparkSession, |
| TableIdentifier(carbonTable.getTableName, |
| Some(carbonTable.getDatabaseName))), None) |
| var storeLocation = "" |
| val configuredStore = Util.getConfiguredLocalDirs(SparkEnv.get.conf) |
| if (null != configuredStore && configuredStore.nonEmpty) { |
| storeLocation = configuredStore(Random.nextInt(configuredStore.length)) |
| } |
| if (storeLocation == null) { |
| storeLocation = System.getProperty("java.io.tmpdir") |
| } |
| storeLocation = storeLocation + "/carbonstore/" + System.nanoTime() |
| |
| val isConcurrentCompactionAllowed = CarbonProperties.getInstance().getProperty( |
| CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, |
| CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION |
| ).equalsIgnoreCase("true") |
| |
| if (!isConcurrentCompactionAllowed) { |
| handleCompactionForSystemLocking(sqlContext, |
| carbonLoadModel, |
| storeLocation, |
| CompactionType.MINOR, |
| carbonTable, |
| compactedSegments, |
| compactionModel, |
| operationContext |
| ) |
| } else { |
| val lock = CarbonLockFactory.getCarbonLockObj( |
| carbonTable.getAbsoluteTableIdentifier, |
| LockUsage.COMPACTION_LOCK) |
| val updateLock = CarbonLockFactory.getCarbonLockObj(carbonTable |
| .getAbsoluteTableIdentifier, LockUsage.UPDATE_LOCK) |
| try { |
| if (updateLock.lockWithRetries(3, 3)) { |
| if (lock.lockWithRetries()) { |
| LOGGER.info("Acquired the compaction lock.") |
| startCompactionThreads(sqlContext, |
| carbonLoadModel, |
| storeLocation, |
| compactionModel, |
| lock, |
| compactedSegments, |
| operationContext |
| ) |
| } else { |
| LOGGER.error("Not able to acquire the compaction lock for table " + |
| s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName}") |
| } |
| } else { |
| throw new ConcurrentOperationException(carbonTable, "update", "compaction") |
| } |
| } catch { |
| case e: Exception => |
| LOGGER.error(s"Exception in start compaction thread.", e) |
| lock.unlock() |
| throw e |
| } finally { |
| updateLock.unlock() |
| } |
| } |
| } |
| } |
| |
| /** |
| * Update table status file after data loading |
| * @param status status collected from each task |
| * @param carbonLoadModel load model used for loading |
| * @param newEntryLoadStatus segment status to set in the metadata |
| * @param overwriteTable true the operation is overwrite |
| * @param segmentFileName segment file name |
| * @param uuid uuid for the table status file name |
| * @return whether operation success and |
| * the segment metadata that written into the segment status file |
| */ |
| private def updateTableStatus( |
| status: Array[(String, (LoadMetadataDetails, ExecutionErrors))], |
| carbonLoadModel: CarbonLoadModel, |
| newEntryLoadStatus: SegmentStatus, |
| overwriteTable: Boolean, |
| segmentFileName: String, |
| uuid: String = ""): (Boolean, LoadMetadataDetails) = { |
| val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable |
| val metadataDetails = if (status != null && status.size > 0 && status(0) != null) { |
| status(0)._2._1 |
| } else { |
| new LoadMetadataDetails |
| } |
| metadataDetails.setSegmentFile(segmentFileName) |
| CarbonLoaderUtil.populateNewLoadMetaEntry( |
| metadataDetails, |
| newEntryLoadStatus, |
| carbonLoadModel.getFactTimeStamp, |
| true) |
| CarbonLoaderUtil |
| .addDataIndexSizeIntoMetaEntry(metadataDetails, carbonLoadModel.getSegmentId, carbonTable) |
| |
| if (!carbonLoadModel.isCarbonTransactionalTable && overwriteTable) { |
| CarbonLoaderUtil.deleteNonTransactionalTableForInsertOverwrite(carbonLoadModel) |
| } |
| val done = CarbonLoaderUtil.recordNewLoadMetadata(metadataDetails, carbonLoadModel, false, |
| overwriteTable, uuid) |
| if (!done) { |
| val errorMessage = s"Dataload failed due to failure in table status updation for" + |
| s" ${carbonLoadModel.getTableName}" |
| LOGGER.error(errorMessage) |
| throw new Exception(errorMessage) |
| } else { |
| DataMapStatusManager.disableAllLazyDataMaps(carbonTable) |
| if (overwriteTable) { |
| val allDataMapSchemas = DataMapStoreManager.getInstance |
| .getDataMapSchemasOfTable(carbonTable).asScala |
| .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier && |
| !dataMapSchema.isIndexDataMap).asJava |
| if (!allDataMapSchemas.isEmpty) { |
| DataMapStatusManager.truncateDataMap(allDataMapSchemas) |
| } |
| } |
| } |
| (done, metadataDetails) |
| } |
| |
| /** |
| * Execute load process to load from input dataframe |
| */ |
| private def loadDataFrame( |
| sqlContext: SQLContext, |
| dataFrame: Option[DataFrame], |
| carbonLoadModel: CarbonLoadModel |
| ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { |
| try { |
| val rdd = dataFrame.get.rdd |
| |
| val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p => |
| DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host) |
| }.distinct.length |
| val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList( |
| nodeNumOfData, |
| sqlContext.sparkContext) |
| val newRdd = new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, rdd, nodes.toArray |
| .distinct) |
| |
| new NewDataFrameLoaderRDD( |
| sqlContext.sparkSession, |
| new DataLoadResultImpl(), |
| carbonLoadModel, |
| newRdd |
| ).collect() |
| } catch { |
| case ex: Exception => |
| LOGGER.error("load data frame failed", ex) |
| throw ex |
| } |
| } |
| |
| /** |
| * Execute load process to load from input file path specified in `carbonLoadModel` |
| */ |
| private def loadDataFile( |
| sqlContext: SQLContext, |
| carbonLoadModel: CarbonLoadModel, |
| hadoopConf: Configuration |
| ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { |
| /* |
| * when data load handle by node partition |
| * 1)clone the hadoop configuration,and set the file path to the configuration |
| * 2)use org.apache.hadoop.mapreduce.lib.input.TextInputFormat to get splits,size info |
| * 3)use CarbonLoaderUtil.nodeBlockMapping to get mapping info of node and block, |
| * for locally writing carbondata files(one file one block) in nodes |
| * 4)use NewCarbonDataLoadRDD to load data and write to carbondata files |
| */ |
| // FileUtils will skip file which is no csv, and return all file path which split by ',' |
| val filePaths = carbonLoadModel.getFactFilePath |
| hadoopConf.set(FileInputFormat.INPUT_DIR, filePaths) |
| hadoopConf.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true") |
| hadoopConf.set("io.compression.codecs", |
| """org.apache.hadoop.io.compress.GzipCodec, |
| org.apache.hadoop.io.compress.DefaultCodec, |
| org.apache.hadoop.io.compress.BZip2Codec""".stripMargin) |
| |
| CommonUtil.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConf) |
| val jobConf = new JobConf(hadoopConf) |
| SparkHadoopUtil.get.addCredentials(jobConf) |
| val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat |
| val jobContext = new Job(jobConf) |
| val rawSplits = inputFormat.getSplits(jobContext).toArray |
| val blockList = rawSplits.map { inputSplit => |
| val fileSplit = inputSplit.asInstanceOf[FileSplit] |
| new TableBlockInfo(fileSplit.getPath.toString, |
| fileSplit.getStart, "1", |
| fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1, null |
| ).asInstanceOf[Distributable] |
| } |
| // group blocks to nodes, tasks |
| val startTime = System.currentTimeMillis |
| val activeNodes = DistributionUtil |
| .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext) |
| val skewedDataOptimization = CarbonProperties.getInstance() |
| .isLoadSkewedDataOptimizationEnabled() |
| // get user ddl input the node loads the smallest amount of data |
| val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable |
| var loadMinSize = carbonLoadModel.getLoadMinSize() |
| if (loadMinSize.equalsIgnoreCase(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT)) { |
| loadMinSize = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala |
| .getOrElse(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB, |
| CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT) |
| } |
| |
| val blockAssignStrategy = if (!loadMinSize.equalsIgnoreCase( |
| CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB_DEFAULT)) { |
| CarbonLoaderUtil.BlockAssignmentStrategy.NODE_MIN_SIZE_FIRST |
| } else if (skewedDataOptimization) { |
| CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_SIZE_FIRST |
| } else { |
| CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST |
| } |
| LOGGER.info(s"Allocating block to nodes using strategy: $blockAssignStrategy") |
| |
| val nodeBlockMapping = CarbonLoaderUtil.nodeBlockMapping(blockList.toSeq.asJava, -1, |
| activeNodes.toList.asJava, blockAssignStrategy, loadMinSize).asScala.toSeq |
| val timeElapsed: Long = System.currentTimeMillis - startTime |
| LOGGER.info("Total Time taken in block allocation: " + timeElapsed) |
| LOGGER.info(s"Total no of blocks: ${ blockList.length }, " + |
| s"No.of Nodes: ${nodeBlockMapping.size}") |
| var str = "" |
| nodeBlockMapping.foreach { entry => |
| val tableBlock = entry._2 |
| val totalSize = tableBlock.asScala.map(_.asInstanceOf[TableBlockInfo].getBlockLength).sum |
| str = str + "#Node: " + entry._1 + ", no.of.blocks: " + tableBlock.size() + |
| f", totalsize.of.blocks: ${totalSize * 0.1 * 10 / 1024 /1024}%.2fMB" |
| tableBlock.asScala.foreach(tableBlockInfo => |
| if (!tableBlockInfo.getLocations.exists(hostentry => |
| hostentry.equalsIgnoreCase(entry._1) |
| )) { |
| str = str + " , mismatch locations: " + tableBlockInfo.getLocations |
| .foldLeft("")((a, b) => a + "," + b) |
| } |
| ) |
| str = str + "\n" |
| } |
| LOGGER.info(str) |
| val blocksGroupBy: Array[(String, Array[BlockDetails])] = nodeBlockMapping.map { entry => |
| val blockDetailsList = |
| entry._2.asScala.map(distributable => { |
| val tableBlock = distributable.asInstanceOf[TableBlockInfo] |
| new BlockDetails(new Path(tableBlock.getFilePath), |
| tableBlock.getBlockOffset, tableBlock.getBlockLength, tableBlock.getLocations |
| ) |
| }).toArray |
| (entry._1, blockDetailsList) |
| }.toArray |
| |
| new NewCarbonDataLoadRDD( |
| sqlContext.sparkSession, |
| new DataLoadResultImpl(), |
| carbonLoadModel, |
| blocksGroupBy |
| ).collect() |
| } |
| } |