| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.spark.rdd |
| |
| import java.text.SimpleDateFormat |
| import java.util |
| import java.util.concurrent._ |
| |
| import scala.collection.JavaConverters._ |
| import scala.collection.mutable.ListBuffer |
| import scala.util.Random |
| import scala.util.control.Breaks._ |
| |
| import org.apache.hadoop.conf.Configuration |
| import org.apache.hadoop.fs.Path |
| import org.apache.hadoop.io.NullWritable |
| import org.apache.hadoop.mapreduce.Job |
| import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} |
| import org.apache.spark.{SparkEnv, SparkException, TaskContext} |
| import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD} |
| import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext} |
| import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, ExecutionErrors, UpdateTableModel} |
| import org.apache.spark.sql.hive.DistributionUtil |
| import org.apache.spark.util.SparkUtil |
| |
| import org.apache.carbondata.common.constants.LoggerAction |
| import org.apache.carbondata.common.logging.LogServiceFactory |
| import org.apache.carbondata.core.constants.CarbonCommonConstants |
| import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo} |
| import org.apache.carbondata.core.dictionary.server.DictionaryServer |
| import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} |
| import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion} |
| import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} |
| import org.apache.carbondata.core.metadata.schema.partition.PartitionType |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable |
| import org.apache.carbondata.core.mutate.CarbonUpdateUtil |
| import org.apache.carbondata.core.scan.partition.PartitionUtil |
| import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} |
| import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties} |
| import org.apache.carbondata.core.util.path.CarbonStorePath |
| import org.apache.carbondata.processing.exception.DataLoadingException |
| import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses} |
| import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, StringArrayWritable} |
| import org.apache.carbondata.processing.loading.exception.{CarbonDataLoadingException, NoRetryException} |
| import org.apache.carbondata.processing.loading.model.CarbonLoadModel |
| import org.apache.carbondata.processing.loading.sort.SortScopeOptions |
| import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType} |
| import org.apache.carbondata.processing.splits.TableSplit |
| import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil, CarbonQueryUtil} |
| import org.apache.carbondata.spark._ |
| import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark |
| import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util} |
| |
| /** |
| * This is the factory class which can create different RDD depends on user needs. |
| * |
| */ |
| object CarbonDataRDDFactory { |
| |
| private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) |
| |
| def alterTableForCompaction(sqlContext: SQLContext, |
| alterTableModel: AlterTableModel, |
| carbonLoadModel: CarbonLoadModel, |
| storePath: String, |
| storeLocation: String): Unit = { |
| var compactionSize: Long = 0 |
| var compactionType: CompactionType = CompactionType.MINOR_COMPACTION |
| if (alterTableModel.compactionType.equalsIgnoreCase("major")) { |
| compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR_COMPACTION) |
| compactionType = CompactionType.MAJOR_COMPACTION |
| } else if (alterTableModel.compactionType |
| .equalsIgnoreCase(CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString)) { |
| compactionType = CompactionType.IUD_UPDDEL_DELTA_COMPACTION |
| if (alterTableModel.segmentUpdateStatusManager.get != None) { |
| carbonLoadModel |
| .setSegmentUpdateStatusManager((alterTableModel.segmentUpdateStatusManager.get)) |
| carbonLoadModel |
| .setLoadMetadataDetails(alterTableModel.segmentUpdateStatusManager.get |
| .getLoadMetadataDetails.toList.asJava) |
| } |
| } |
| else { |
| compactionType = CompactionType.MINOR_COMPACTION |
| } |
| |
| LOGGER.audit(s"Compaction request received for table " + |
| s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") |
| val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable |
| |
| if (null == carbonLoadModel.getLoadMetadataDetails) { |
| CommonUtil.readLoadMetadataDetails(carbonLoadModel) |
| } |
| // reading the start time of data load. |
| val loadStartTime = CarbonUpdateUtil.readCurrentTime(); |
| carbonLoadModel.setFactTimeStamp(loadStartTime) |
| |
| val isCompactionTriggerByDDl = true |
| val compactionModel = CompactionModel(compactionSize, |
| compactionType, |
| carbonTable, |
| isCompactionTriggerByDDl |
| ) |
| |
| val isConcurrentCompactionAllowed = CarbonProperties.getInstance() |
| .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, |
| CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION |
| ) |
| .equalsIgnoreCase("true") |
| |
| // if system level compaction is enabled then only one compaction can run in the system |
| // if any other request comes at this time then it will create a compaction request file. |
| // so that this will be taken up by the compaction process which is executing. |
| if (!isConcurrentCompactionAllowed) { |
| LOGGER.info("System level compaction lock is enabled.") |
| handleCompactionForSystemLocking(sqlContext, |
| carbonLoadModel, |
| storePath, |
| storeLocation, |
| compactionType, |
| carbonTable, |
| compactionModel |
| ) |
| } else { |
| // normal flow of compaction |
| val lock = CarbonLockFactory |
| .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, |
| LockUsage.COMPACTION_LOCK |
| ) |
| |
| if (lock.lockWithRetries()) { |
| LOGGER.info("Acquired the compaction lock for table" + |
| s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") |
| try { |
| startCompactionThreads(sqlContext, |
| carbonLoadModel, |
| storePath, |
| storeLocation, |
| compactionModel, |
| lock |
| ) |
| } catch { |
| case e: Exception => |
| LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }") |
| lock.unlock() |
| } |
| } else { |
| LOGGER.audit("Not able to acquire the compaction lock for table " + |
| s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") |
| LOGGER.error(s"Not able to acquire the compaction lock for table" + |
| s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") |
| sys.error("Table is already locked for compaction. Please try after some time.") |
| } |
| } |
| } |
| |
| def handleCompactionForSystemLocking(sqlContext: SQLContext, |
| carbonLoadModel: CarbonLoadModel, |
| storePath: String, |
| storeLocation: String, |
| compactionType: CompactionType, |
| carbonTable: CarbonTable, |
| compactionModel: CompactionModel): Unit = { |
| val lock = CarbonLockFactory |
| .getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER, |
| LockUsage.SYSTEMLEVEL_COMPACTION_LOCK |
| ) |
| if (lock.lockWithRetries()) { |
| LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" + |
| s".${ carbonLoadModel.getTableName }") |
| try { |
| startCompactionThreads(sqlContext, |
| carbonLoadModel, |
| storePath, |
| storeLocation, |
| compactionModel, |
| lock |
| ) |
| } catch { |
| case e: Exception => |
| LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }") |
| lock.unlock() |
| // if the compaction is a blocking call then only need to throw the exception. |
| if (compactionModel.isDDLTrigger) { |
| throw e |
| } |
| } |
| } else { |
| LOGGER.audit("Not able to acquire the system level compaction lock for table " + |
| s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") |
| LOGGER.error("Not able to acquire the compaction lock for table " + |
| s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") |
| CarbonCompactionUtil |
| .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType) |
| // do sys error only in case of DDL trigger. |
| if (compactionModel.isDDLTrigger) { |
| sys.error("Compaction is in progress, compaction request for table " + |
| s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" + |
| " is in queue.") |
| } else { |
| LOGGER.error("Compaction is in progress, compaction request for table " + |
| s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" + |
| " is in queue.") |
| } |
| } |
| } |
| |
| def startCompactionThreads(sqlContext: SQLContext, |
| carbonLoadModel: CarbonLoadModel, |
| storePath: String, |
| storeLocation: String, |
| compactionModel: CompactionModel, |
| compactionLock: ICarbonLock): Unit = { |
| val executor: ExecutorService = Executors.newFixedThreadPool(1) |
| // update the updated table status. |
| CommonUtil.readLoadMetadataDetails(carbonLoadModel) |
| val compactionThread = new Thread { |
| override def run(): Unit = { |
| |
| try { |
| // compaction status of the table which is triggered by the user. |
| var triggeredCompactionStatus = false |
| var exception: Exception = null |
| try { |
| DataManagementFunc.executeCompaction(carbonLoadModel: CarbonLoadModel, |
| compactionModel: CompactionModel, |
| executor, sqlContext, storeLocation |
| ) |
| triggeredCompactionStatus = true |
| } catch { |
| case e: Exception => |
| LOGGER.error(s"Exception in compaction thread ${ e.getMessage }") |
| exception = e |
| } |
| // continue in case of exception also, check for all the tables. |
| val isConcurrentCompactionAllowed = CarbonProperties.getInstance() |
| .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, |
| CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION |
| ).equalsIgnoreCase("true") |
| |
| if (!isConcurrentCompactionAllowed) { |
| LOGGER.info("System level compaction lock is enabled.") |
| val skipCompactionTables = ListBuffer[CarbonTableIdentifier]() |
| var table: CarbonTable = CarbonCompactionUtil |
| .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata. |
| tablesMeta.map(_.carbonTable).toArray, |
| skipCompactionTables.toList.asJava) |
| while (null != table) { |
| LOGGER.info("Compaction request has been identified for table " + |
| s"${ table.getDatabaseName }." + |
| s"${ table.getFactTableName }") |
| val metadataPath = table.getMetaDataFilepath |
| val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath) |
| |
| val newCarbonLoadModel = new CarbonLoadModel() |
| DataManagementFunc.prepareCarbonLoadModel(table, newCarbonLoadModel) |
| |
| val compactionSize = CarbonDataMergerUtil |
| .getCompactionSize(CompactionType.MAJOR_COMPACTION) |
| |
| val newcompactionModel = CompactionModel(compactionSize, |
| compactionType, |
| table, |
| compactionModel.isDDLTrigger |
| ) |
| // proceed for compaction |
| try { |
| DataManagementFunc.executeCompaction(newCarbonLoadModel, |
| newcompactionModel, |
| executor, sqlContext, storeLocation |
| ) |
| } catch { |
| case e: Exception => |
| LOGGER.error("Exception in compaction thread for table " + |
| s"${ table.getDatabaseName }." + |
| s"${ table.getFactTableName }") |
| // not handling the exception. only logging as this is not the table triggered |
| // by user. |
| } finally { |
| // delete the compaction required file in case of failure or success also. |
| if (!CarbonCompactionUtil |
| .deleteCompactionRequiredFile(metadataPath, compactionType)) { |
| // if the compaction request file is not been able to delete then |
| // add those tables details to the skip list so that it wont be considered next. |
| skipCompactionTables.+=:(table.getCarbonTableIdentifier) |
| LOGGER.error("Compaction request file can not be deleted for table " + |
| s"${ table.getDatabaseName }." + |
| s"${ table.getFactTableName }") |
| } |
| } |
| // ********* check again for all the tables. |
| table = CarbonCompactionUtil |
| .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata |
| .tablesMeta.map(_.carbonTable).toArray, skipCompactionTables.asJava |
| ) |
| } |
| // giving the user his error for telling in the beeline if his triggered table |
| // compaction is failed. |
| if (!triggeredCompactionStatus) { |
| throw new Exception("Exception in compaction " + exception.getMessage) |
| } |
| } |
| } finally { |
| executor.shutdownNow() |
| DataManagementFunc.deletePartialLoadsInCompaction(carbonLoadModel) |
| compactionLock.unlock() |
| } |
| } |
| } |
| // calling the run method of a thread to make the call as blocking call. |
| // in the future we may make this as concurrent. |
| compactionThread.run() |
| } |
| |
| def loadCarbonData(sqlContext: SQLContext, |
| carbonLoadModel: CarbonLoadModel, |
| storePath: String, |
| columnar: Boolean, |
| partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS, |
| result: Option[DictionaryServer], |
| overwriteTable: Boolean, |
| dataFrame: Option[DataFrame] = None, |
| updateModel: Option[UpdateTableModel] = None): Unit = { |
| val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable |
| val isAgg = false |
| // for handling of the segment Merging. |
| def handleSegmentMerging(): Unit = { |
| LOGGER.info(s"compaction need status is" + |
| s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }") |
| if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) { |
| LOGGER.audit(s"Compaction request received for table " + |
| s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") |
| val compactionSize = 0 |
| val isCompactionTriggerByDDl = false |
| val compactionModel = CompactionModel(compactionSize, |
| CompactionType.MINOR_COMPACTION, |
| carbonTable, |
| isCompactionTriggerByDDl |
| ) |
| var storeLocation = "" |
| val configuredStore = Util.getConfiguredLocalDirs(SparkEnv.get.conf) |
| if (null != configuredStore && configuredStore.nonEmpty) { |
| storeLocation = configuredStore(Random.nextInt(configuredStore.length)) |
| } |
| if (storeLocation == null) { |
| storeLocation = System.getProperty("java.io.tmpdir") |
| } |
| storeLocation = storeLocation + "/carbonstore/" + System.nanoTime() |
| |
| val isConcurrentCompactionAllowed = CarbonProperties.getInstance() |
| .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, |
| CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION |
| ) |
| .equalsIgnoreCase("true") |
| |
| if (!isConcurrentCompactionAllowed) { |
| |
| handleCompactionForSystemLocking(sqlContext, |
| carbonLoadModel, |
| storePath, |
| storeLocation, |
| CompactionType.MINOR_COMPACTION, |
| carbonTable, |
| compactionModel |
| ) |
| } else { |
| val lock = CarbonLockFactory |
| .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, |
| LockUsage.COMPACTION_LOCK |
| ) |
| |
| if (lock.lockWithRetries()) { |
| LOGGER.info("Acquired the compaction lock.") |
| try { |
| startCompactionThreads(sqlContext, |
| carbonLoadModel, |
| storePath, |
| storeLocation, |
| compactionModel, |
| lock |
| ) |
| } catch { |
| case e: Exception => |
| LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }") |
| lock.unlock() |
| throw e |
| } |
| } else { |
| LOGGER.audit("Not able to acquire the compaction lock for table " + |
| s"${ carbonLoadModel.getDatabaseName }.${ |
| carbonLoadModel |
| .getTableName |
| }") |
| LOGGER.error("Not able to acquire the compaction lock for table " + |
| s"${ carbonLoadModel.getDatabaseName }.${ |
| carbonLoadModel |
| .getTableName |
| }") |
| } |
| } |
| } |
| } |
| |
| def updateStatus(loadStatus: String, |
| stat: Array[(String, (LoadMetadataDetails, ExecutionErrors))]) = { |
| val metadataDetails = if (stat != null && stat(0) != null) { |
| stat(0)._2._1 |
| } else { |
| new LoadMetadataDetails |
| } |
| CarbonLoaderUtil |
| .populateNewLoadMetaEntry(metadataDetails, |
| loadStatus, |
| carbonLoadModel.getFactTimeStamp, |
| true) |
| val status = CarbonLoaderUtil.recordLoadMetadata(metadataDetails, |
| carbonLoadModel, false, overwriteTable) |
| if (!status) { |
| val errorMessage = "Dataload failed due to failure in table status updation." |
| LOGGER.audit("Data load is failed for " + |
| s"${ carbonLoadModel.getDatabaseName }.${ |
| carbonLoadModel |
| .getTableName |
| }") |
| LOGGER.error("Dataload failed due to failure in table status updation.") |
| throw new Exception(errorMessage) |
| } |
| } |
| |
| try { |
| LOGGER.audit(s"Data load request has been received for table" + |
| s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") |
| // Check if any load need to be deleted before loading new data |
| DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName, |
| carbonLoadModel.getTableName, storePath, false, carbonTable) |
| // get partition way from configuration |
| // val isTableSplitPartition = CarbonProperties.getInstance().getProperty( |
| // CarbonCommonConstants.TABLE_SPLIT_PARTITION, |
| // CarbonCommonConstants.TABLE_SPLIT_PARTITION_DEFAULT_VALUE).toBoolean |
| val isTableSplitPartition = false |
| var blocksGroupBy: Array[(String, Array[BlockDetails])] = null |
| var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null |
| var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null |
| |
| def loadDataFile(): Unit = { |
| if (isTableSplitPartition) { |
| /* |
| * when data handle by table split partition |
| * 1) get partition files, direct load or not will get the different files path |
| * 2) get files blocks by using SplitUtils |
| * 3) output Array[(partitionID,Array[BlockDetails])] to blocksGroupBy |
| */ |
| var splits = Array[TableSplit]() |
| if (carbonLoadModel.isDirectLoad) { |
| // get all table Splits, this part means files were divide to different partitions |
| splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath) |
| // get all partition blocks from file list |
| blocksGroupBy = splits.map { |
| split => |
| val pathBuilder = new StringBuilder() |
| for (path <- split.getPartition.getFilesPath.asScala) { |
| pathBuilder.append(path).append(",") |
| } |
| if (pathBuilder.nonEmpty) { |
| pathBuilder.substring(0, pathBuilder.size - 1) |
| } |
| (split.getPartition.getUniqueID, SparkUtil.getSplits(pathBuilder.toString(), |
| sqlContext.sparkContext |
| )) |
| } |
| } else { |
| // get all table Splits,when come to this, means data have been partition |
| splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName, |
| carbonLoadModel.getTableName, null) |
| // get all partition blocks from factFilePath/uniqueID/ |
| blocksGroupBy = splits.map { |
| split => |
| val pathBuilder = new StringBuilder() |
| pathBuilder.append(carbonLoadModel.getFactFilePath) |
| if (!carbonLoadModel.getFactFilePath.endsWith("/") |
| && !carbonLoadModel.getFactFilePath.endsWith("\\")) { |
| pathBuilder.append("/") |
| } |
| pathBuilder.append(split.getPartition.getUniqueID).append("/") |
| (split.getPartition.getUniqueID, |
| SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext)) |
| } |
| } |
| } else { |
| /* |
| * when data load handle by node partition |
| * 1)clone the hadoop configuration,and set the file path to the configuration |
| * 2)use org.apache.hadoop.mapreduce.lib.input.TextInputFormat to get splits,size info |
| * 3)use CarbonLoaderUtil.nodeBlockMapping to get mapping info of node and block, |
| * for locally writing carbondata files(one file one block) in nodes |
| * use NewCarbonDataLoadRDD to load data and write to carbondata files |
| */ |
| val hadoopConfiguration = new Configuration(sqlContext.sparkContext.hadoopConfiguration) |
| // FileUtils will skip file which is no csv, and return all file path which split by ',' |
| val filePaths = carbonLoadModel.getFactFilePath |
| hadoopConfiguration.set(FileInputFormat.INPUT_DIR, filePaths) |
| hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true") |
| hadoopConfiguration.set("io.compression.codecs", |
| """org.apache.hadoop.io.compress.GzipCodec, |
| org.apache.hadoop.io.compress.DefaultCodec, |
| org.apache.hadoop.io.compress.BZip2Codec""".stripMargin) |
| |
| CommonUtil.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration) |
| |
| val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat |
| val jobContext = new Job(hadoopConfiguration) |
| val rawSplits = inputFormat.getSplits(jobContext).toArray |
| val blockList = rawSplits.map { inputSplit => |
| val fileSplit = inputSplit.asInstanceOf[FileSplit] |
| new TableBlockInfo(fileSplit.getPath.toString, |
| fileSplit.getStart, "1", |
| fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1, null |
| ).asInstanceOf[Distributable] |
| } |
| // group blocks to nodes, tasks |
| val startTime = System.currentTimeMillis |
| val activeNodes = DistributionUtil |
| .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext) |
| val nodeBlockMapping = |
| CarbonLoaderUtil |
| .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala |
| .toSeq |
| val timeElapsed: Long = System.currentTimeMillis - startTime |
| LOGGER.info("Total Time taken in block allocation: " + timeElapsed) |
| LOGGER.info(s"Total no of blocks: ${ blockList.length }, " + |
| s"No.of Nodes: ${nodeBlockMapping.size}") |
| var str = "" |
| nodeBlockMapping.foreach(entry => { |
| val tableBlock = entry._2 |
| str = str + "#Node: " + entry._1 + " no.of.blocks: " + tableBlock.size() |
| tableBlock.asScala.foreach(tableBlockInfo => |
| if (!tableBlockInfo.getLocations.exists(hostentry => |
| hostentry.equalsIgnoreCase(entry._1) |
| )) { |
| str = str + " , mismatch locations: " + tableBlockInfo.getLocations |
| .foldLeft("")((a, b) => a + "," + b) |
| } |
| ) |
| str = str + "\n" |
| } |
| ) |
| LOGGER.info(str) |
| blocksGroupBy = nodeBlockMapping.map(entry => { |
| val blockDetailsList = |
| entry._2.asScala.map(distributable => { |
| val tableBlock = distributable.asInstanceOf[TableBlockInfo] |
| new BlockDetails(new Path(tableBlock.getFilePath), |
| tableBlock.getBlockOffset, tableBlock.getBlockLength, tableBlock.getLocations |
| ) |
| }).toArray |
| (entry._1, blockDetailsList) |
| } |
| ).toArray |
| } |
| |
| status = new NewCarbonDataLoadRDD(sqlContext.sparkContext, |
| new DataLoadResultImpl(), |
| carbonLoadModel, |
| blocksGroupBy, |
| isTableSplitPartition).collect() |
| } |
| |
| def loadDataFrame(): Unit = { |
| try { |
| val rdd = dataFrame.get.rdd |
| val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p => |
| DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host) |
| }.distinct.size |
| val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData, |
| sqlContext.sparkContext) |
| val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct) |
| var numPartitions = DistributionUtil.getNodeList(sqlContext.sparkContext).length |
| numPartitions = Math.max(1, Math.min(numPartitions, rdd.partitions.length)) |
| val coalesceRdd = rdd.coalesce(numPartitions, shuffle = false) |
| |
| status = new NewDataFrameLoaderRDD(sqlContext.sparkContext, |
| new DataLoadResultImpl(), |
| carbonLoadModel, |
| newRdd).collect() |
| } catch { |
| case ex: Exception => |
| LOGGER.error(ex, "load data frame failed") |
| throw ex |
| } |
| } |
| |
| def loadDataFrameForUpdate(): Unit = { |
| val segmentUpdateParallelism = CarbonProperties.getInstance().getParallelismForSegmentUpdate |
| |
| def triggerDataLoadForSegment(key: String, taskNo: Int, |
| iter: Iterator[Row]): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = { |
| val rddResult = new updateResultImpl() |
| val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) |
| val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] { |
| var partitionID = "0" |
| val loadMetadataDetails = new LoadMetadataDetails |
| val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") |
| var uniqueLoadStatusId = "" |
| try { |
| val segId = key |
| val index = taskNo |
| uniqueLoadStatusId = carbonLoadModel.getTableName + |
| CarbonCommonConstants.UNDERSCORE + |
| (index + "_0") |
| |
| // convert timestamp |
| val timeStampInLong = updateModel.get.updatedTimeStamp + "" |
| loadMetadataDetails.setPartitionCount(partitionID) |
| loadMetadataDetails.setLoadName(segId) |
| loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) |
| carbonLoadModel.setPartitionId(partitionID) |
| carbonLoadModel.setSegmentId(segId) |
| carbonLoadModel.setTaskNo(String.valueOf(index)) |
| carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp) |
| |
| // During Block Spill case Increment of File Count and proper adjustment of Block |
| // naming is only done when AbstractFactDataWriter.java : initializeWriter get |
| // CarbondataFileName as null. For handling Block Spill not setting the |
| // CarbondataFileName in case of Update. |
| // carbonLoadModel.setCarbondataFileName(newBlockName) |
| |
| // storeLocation = CarbonDataLoadRDD.initialize(carbonLoadModel, index) |
| loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) |
| UpdateDataLoad.DataLoadForUpdate(segId, |
| index, |
| iter, |
| carbonLoadModel, |
| loadMetadataDetails) |
| } catch { |
| case e: NoRetryException => |
| loadMetadataDetails |
| .setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) |
| executionErrors.failureCauses = FailureCauses.BAD_RECORDS |
| executionErrors.errorMsg = e.getMessage |
| LOGGER.info("Bad Record Found") |
| case e: Exception => |
| LOGGER.info("DataLoad failure") |
| LOGGER.error(e) |
| throw e |
| } |
| |
| var finished = false |
| |
| override def hasNext: Boolean = !finished |
| |
| override def next(): (String, (LoadMetadataDetails, ExecutionErrors)) = { |
| finished = true |
| rddResult |
| .getKey(uniqueLoadStatusId, |
| (loadMetadataDetails, executionErrors)) |
| } |
| } |
| resultIter |
| } |
| |
| val updateRdd = dataFrame.get.rdd |
| |
| // return directly if no rows to update |
| val noRowsToUpdate = updateRdd.isEmpty() |
| if (noRowsToUpdate) { |
| res = Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]]() |
| return |
| } |
| |
| // splitting as (key, value) i.e., (segment, updatedRows) |
| val keyRDD = updateRdd.map(row => |
| (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*))) |
| |
| val loadMetadataDetails = SegmentStatusManager.readLoadMetadata( |
| carbonTable.getMetaDataFilepath) |
| val segmentIds = loadMetadataDetails.map(_.getLoadName) |
| val segmentIdIndex = segmentIds.zipWithIndex.toMap |
| val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonLoadModel.getStorePath, |
| carbonTable.getCarbonTableIdentifier) |
| val segmentId2maxTaskNo = segmentIds.map { segId => |
| (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonTablePath)) |
| }.toMap |
| |
| class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int) |
| extends org.apache.spark.Partitioner { |
| override def numPartitions: Int = segmentIdIndex.size * parallelism |
| |
| override def getPartition(key: Any): Int = { |
| val segId = key.asInstanceOf[String] |
| // partitionId |
| segmentIdIndex(segId) * parallelism + Random.nextInt(parallelism) |
| } |
| } |
| |
| val partitionByRdd = keyRDD.partitionBy(new SegmentPartitioner(segmentIdIndex, |
| segmentUpdateParallelism)) |
| |
| // because partitionId=segmentIdIndex*parallelism+RandomPart and RandomPart<parallelism, |
| // so segmentIdIndex=partitionId/parallelism, this has been verified. |
| res = partitionByRdd.map(_._2).mapPartitions { partition => |
| val partitionId = TaskContext.getPartitionId() |
| val segIdIndex = partitionId / segmentUpdateParallelism |
| val randomPart = partitionId - segIdIndex * segmentUpdateParallelism |
| val segId = segmentIds(segIdIndex) |
| val newTaskNo = segmentId2maxTaskNo(segId) + randomPart + 1 |
| |
| List(triggerDataLoadForSegment(segId, newTaskNo, partition).toList).toIterator |
| }.collect() |
| } |
| |
| def loadDataForPartitionTable(): Unit = { |
| try { |
| val rdd = repartitionInputData(sqlContext, dataFrame, carbonLoadModel) |
| status = new PartitionTableDataLoaderRDD(sqlContext.sparkContext, |
| new DataLoadResultImpl(), |
| carbonLoadModel, |
| rdd).collect() |
| } catch { |
| case ex: Exception => |
| LOGGER.error(ex, "load data failed for partition table") |
| throw ex |
| } |
| } |
| |
| // create new segment folder in carbon store |
| if (!updateModel.isDefined) { |
| CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath, |
| carbonLoadModel.getSegmentId, carbonTable) |
| } |
| var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS |
| var errorMessage: String = "DataLoad failure" |
| var executorMessage: String = "" |
| val configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel) |
| val sortScope = CarbonDataProcessorUtil.getSortScope(configuration) |
| try { |
| if (updateModel.isDefined) { |
| loadDataFrameForUpdate() |
| } else if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) { |
| loadDataForPartitionTable() |
| } else if (configuration.isSortTable && |
| sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) { |
| LOGGER.audit("Using global sort for loading.") |
| status = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkContext, |
| dataFrame, carbonLoadModel) |
| } else if (dataFrame.isDefined) { |
| loadDataFrame() |
| } else { |
| loadDataFile() |
| } |
| if (updateModel.isDefined) { |
| |
| res.foreach(resultOfSeg => resultOfSeg.foreach( |
| resultOfBlock => { |
| if (resultOfBlock._2._1.getLoadStatus |
| .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) { |
| loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE |
| if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) { |
| updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE |
| updateModel.get.executorErrors.errorMsg = "Failure in the Executor." |
| } |
| else { |
| updateModel.get.executorErrors = resultOfBlock._2._2 |
| } |
| } else if (resultOfBlock._2._1.getLoadStatus |
| .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)) { |
| loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS |
| updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses |
| updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg |
| } |
| } |
| )) |
| |
| } |
| else { |
| val newStatusMap = scala.collection.mutable.Map.empty[String, String] |
| if (status.nonEmpty) { |
| status.foreach { eachLoadStatus => |
| val state = newStatusMap.get(eachLoadStatus._1) |
| state match { |
| case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) => |
| newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus) |
| case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) |
| if eachLoadStatus._2._1.getLoadStatus == |
| CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS => |
| newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus) |
| case _ => |
| newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus) |
| } |
| } |
| |
| newStatusMap.foreach { |
| case (key, value) => |
| if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) { |
| loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE |
| } else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS && |
| !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) { |
| loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS |
| } |
| } |
| } else { |
| loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE |
| } |
| |
| if (loadStatus != CarbonCommonConstants.STORE_LOADSTATUS_FAILURE && |
| partitionStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) { |
| loadStatus = partitionStatus |
| } |
| } |
| } catch { |
| case ex: Throwable => |
| loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE |
| ex match { |
| case sparkException: SparkException => |
| if (sparkException.getCause.isInstanceOf[DataLoadingException] || |
| sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) { |
| executorMessage = sparkException.getCause.getMessage |
| errorMessage = errorMessage + ": " + executorMessage |
| } |
| case _ => |
| executorMessage = ex.getCause.getMessage |
| errorMessage = errorMessage + ": " + executorMessage |
| } |
| LOGGER.info(errorMessage) |
| LOGGER.error(ex) |
| } |
| // handle the status file updation for the update cmd. |
| if (updateModel.isDefined) { |
| |
| if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) { |
| // updateModel.get.executorErrors.errorMsg = errorMessage |
| if (updateModel.get.executorErrors.failureCauses == FailureCauses.NONE) { |
| updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE |
| if (null != executorMessage && !executorMessage.isEmpty) { |
| updateModel.get.executorErrors.errorMsg = executorMessage |
| } else { |
| updateModel.get.executorErrors.errorMsg = "Update failed as the data load has failed." |
| } |
| } |
| return |
| } else if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS && |
| updateModel.get.executorErrors.failureCauses == FailureCauses.BAD_RECORDS && |
| carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) { |
| return |
| } else { |
| // in success case handle updation of the table status file. |
| // success case. |
| val segmentDetails = new util.HashSet[String]() |
| |
| var resultSize = 0 |
| |
| res.foreach(resultOfSeg => { |
| resultSize = resultSize + resultOfSeg.size |
| resultOfSeg.foreach( |
| resultOfBlock => { |
| segmentDetails.add(resultOfBlock._2._1.getLoadName) |
| } |
| )} |
| ) |
| |
| // this means that the update doesnt have any records to update so no need to do table |
| // status file updation. |
| if (resultSize == 0) { |
| LOGGER.audit("Data update is successful with 0 rows updation for " + |
| s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}") |
| return |
| } |
| |
| if ( |
| CarbonUpdateUtil |
| .updateTableMetadataStatus(segmentDetails, |
| carbonTable, |
| updateModel.get.updatedTimeStamp + "", |
| true, |
| new util.ArrayList[String](0))) { |
| LOGGER.audit("Data update is successful for " + |
| s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") |
| } |
| else { |
| val errorMessage = "Data update failed due to failure in table status updation." |
| LOGGER.audit("Data update is failed for " + |
| s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}") |
| LOGGER.error("Data update failed due to failure in table status updation.") |
| updateModel.get.executorErrors.errorMsg = errorMessage |
| updateModel.get.executorErrors.failureCauses = FailureCauses |
| .STATUS_FILE_UPDATION_FAILURE |
| return |
| } |
| |
| } |
| |
| return |
| } |
| if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) { |
| LOGGER.info("********starting clean up**********") |
| CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) |
| LOGGER.info("********clean up done**********") |
| LOGGER.audit(s"Data load is failed for " + |
| s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") |
| LOGGER.warn("Cannot write load metadata file as data load failed") |
| updateStatus(loadStatus, status) |
| throw new Exception(errorMessage) |
| } else { |
| // check if data load fails due to bad record and throw data load failure due to |
| // bad record exception |
| if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS && |
| status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS && |
| carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) { |
| LOGGER.info("********starting clean up**********") |
| CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) |
| LOGGER.info("********clean up done**********") |
| LOGGER.audit(s"Data load is failed for " + |
| s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") |
| updateStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE, status) |
| throw new Exception(status(0)._2._2.errorMsg) |
| } |
| if (!isAgg) { |
| writeDictionary(carbonLoadModel, result) |
| updateStatus(loadStatus, status) |
| } else if (!carbonLoadModel.isRetentionRequest) { |
| // TODO : Handle it |
| LOGGER.info("********Database updated**********") |
| } |
| |
| if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equals(loadStatus)) { |
| LOGGER.audit("Data load is partially successful for " + |
| s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") |
| } else { |
| LOGGER.audit("Data load is successful for " + |
| s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") |
| } |
| try { |
| // compaction handling |
| handleSegmentMerging() |
| } catch { |
| case e: Exception => |
| throw new Exception( |
| "Dataload is success. Auto-Compaction has failed. Please check logs.") |
| } |
| } |
| } |
| |
| } |
| |
| /** |
| * repartition the input data for partiton table. |
| * @param sqlContext |
| * @param dataFrame |
| * @param carbonLoadModel |
| * @return |
| */ |
| private def repartitionInputData(sqlContext: SQLContext, |
| dataFrame: Option[DataFrame], |
| carbonLoadModel: CarbonLoadModel): RDD[Row] = { |
| val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable |
| val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) |
| val partitionColumn = partitionInfo.getColumnSchemaList.get(0).getColumnName |
| val partitionColumnDataType = partitionInfo.getColumnSchemaList.get(0).getDataType |
| val columns = carbonLoadModel.getCsvHeaderColumns |
| var partitionColumnIndex = -1 |
| breakable { |
| for (i <- 0 until columns.length) { |
| if (partitionColumn.equalsIgnoreCase(columns(i))) { |
| partitionColumnIndex = i |
| break |
| } |
| } |
| } |
| if (partitionColumnIndex == -1) { |
| throw new DataLoadingException("Partition column not found.") |
| } |
| |
| val dateFormatMap = CarbonDataProcessorUtil.getDateFormatMap(carbonLoadModel.getDateFormat()) |
| val specificFormat = Option(dateFormatMap.get(partitionColumn.toLowerCase)) |
| val timeStampFormat = if (specificFormat.isDefined) { |
| new SimpleDateFormat(specificFormat.get) |
| } else { |
| val timestampFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants |
| .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) |
| new SimpleDateFormat(timestampFormatString) |
| } |
| |
| val dateFormat = if (specificFormat.isDefined) { |
| new SimpleDateFormat(specificFormat.get) |
| } else { |
| val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants |
| .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) |
| new SimpleDateFormat(dateFormatString) |
| } |
| |
| // generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions |
| val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) { |
| // input data from DataFrame |
| val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 |
| val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2 |
| val serializationNullFormat = |
| carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) |
| dataFrame.get.rdd.map { row => |
| if (null != row && row.length > partitionColumnIndex && |
| null != row.get(partitionColumnIndex)) { |
| (CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat, |
| delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row) |
| } else { |
| (null, row) |
| } |
| } |
| } else { |
| // input data from csv files |
| val hadoopConfiguration = new Configuration() |
| CommonUtil.configureCSVInputFormat(hadoopConfiguration, carbonLoadModel) |
| hadoopConfiguration.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath) |
| val columnCount = columns.length |
| new NewHadoopRDD[NullWritable, StringArrayWritable]( |
| sqlContext.sparkContext, |
| classOf[CSVInputFormat], |
| classOf[NullWritable], |
| classOf[StringArrayWritable], |
| hadoopConfiguration |
| ).map { currentRow => |
| if (null == currentRow || null == currentRow._2) { |
| val row = new StringArrayRow(new Array[String](columnCount)) |
| (null, row) |
| } else { |
| val row = new StringArrayRow(new Array[String](columnCount)) |
| val values = currentRow._2.get() |
| if (values != null && values.length > partitionColumnIndex) { |
| (currentRow._2.get()(partitionColumnIndex), row.setValues(currentRow._2.get())) |
| } else { |
| (null, row.setValues(currentRow._2.get())) |
| } |
| } |
| } |
| } |
| |
| val partitioner = PartitionFactory.getPartitioner(partitionInfo) |
| if (partitionColumnDataType == DataTypes.STRING) { |
| if (partitionInfo.getPartitionType == PartitionType.RANGE) { |
| inputRDD.map { row => (ByteUtil.toBytes(row._1), row._2) } |
| .partitionBy(partitioner) |
| .map(_._2) |
| } else { |
| inputRDD.partitionBy(partitioner) |
| .map(_._2) |
| } |
| } else { |
| inputRDD.map { row => |
| (PartitionUtil.getDataBasedOnDataType(row._1, partitionColumnDataType, timeStampFormat, |
| dateFormat), row._2) |
| } |
| .partitionBy(partitioner) |
| .map(_._2) |
| } |
| } |
| |
| private def writeDictionary(carbonLoadModel: CarbonLoadModel, |
| result: Option[DictionaryServer]) = { |
| // write dictionary file and shutdown dictionary server |
| val uniqueTableName: String = s"${ carbonLoadModel.getDatabaseName }_${ |
| carbonLoadModel.getTableName }" |
| result match { |
| case Some(server) => |
| try { |
| server.writeTableDictionary(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable |
| .getCarbonTableIdentifier.getTableId) |
| } catch { |
| case ex: Exception => |
| LOGGER.error(s"Error while writing dictionary file for $uniqueTableName") |
| throw new Exception("Dataload failed due to error while writing dictionary file!") |
| } |
| case _ => |
| } |
| } |
| |
| } |