| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.sql.execution.command |
| |
| import java.util |
| |
| import scala.collection.JavaConverters._ |
| import scala.collection.mutable.ListBuffer |
| |
| import org.apache.spark.rdd.RDD |
| import org.apache.spark.sql._ |
| import org.apache.spark.sql.catalyst.TableIdentifier |
| import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} |
| import org.apache.spark.sql.execution.RunnableCommand |
| import org.apache.spark.sql.execution.datasources.LogicalRelation |
| import org.apache.spark.storage.StorageLevel |
| |
| import org.apache.carbondata.common.logging.LogServiceFactory |
| import org.apache.carbondata.core.constants.CarbonCommonConstants |
| import org.apache.carbondata.core.datastore.impl.FileFactory |
| import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} |
| import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable |
| import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum} |
| import org.apache.carbondata.core.mutate.data.RowCountDetailsVO |
| import org.apache.carbondata.core.statusmanager.{SegmentStatusManager, SegmentUpdateStatusManager} |
| import org.apache.carbondata.core.util.CarbonProperties |
| import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} |
| import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl |
| import org.apache.carbondata.processing.exception.MultipleMatchingException |
| import org.apache.carbondata.processing.loading.FailureCauses |
| import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType} |
| import org.apache.carbondata.spark.DeleteDelataResultImpl |
| import org.apache.carbondata.spark.util.QueryPlanUtil |
| |
| |
| /** |
| * IUD update delete and compaction framework. |
| * |
| */ |
| |
| private[sql] case class ProjectForDeleteCommand( |
| plan: LogicalPlan, |
| identifier: Seq[String], |
| timestamp: String) extends RunnableCommand { |
| |
| val LOG = LogServiceFactory.getLogService(this.getClass.getName) |
| var horizontalCompactionFailed = false |
| |
| override def run(sqlContext: SQLContext): Seq[Row] = { |
| |
| val dataFrame = DataFrame(sqlContext, plan) |
| val dataRdd = dataFrame.rdd |
| |
| val relation = CarbonEnv.get.carbonMetastore |
| .lookupRelation1(deleteExecution.getTableIdentifier(identifier))(sqlContext). |
| asInstanceOf[CarbonRelation] |
| val carbonTable = relation.tableMeta.carbonTable |
| val metadataLock = CarbonLockFactory |
| .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, |
| LockUsage.METADATA_LOCK) |
| var lockStatus = false |
| try { |
| lockStatus = metadataLock.lockWithRetries() |
| LOG.audit(s" Delete data request has been received " + |
| s"for ${ relation.databaseName }.${ relation.tableName }.") |
| if (lockStatus) { |
| LOG.info("Successfully able to get the table metadata file lock") |
| } |
| else { |
| throw new Exception("Table is locked for deletion. Please try after some time") |
| } |
| val tablePath = CarbonStorePath.getCarbonTablePath( |
| carbonTable.getStorePath, |
| carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier) |
| var executorErrors = new ExecutionErrors(FailureCauses.NONE, "") |
| |
| // handle the clean up of IUD. |
| CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false) |
| |
| if (deleteExecution |
| .deleteDeltaExecution(identifier, sqlContext, dataRdd, timestamp, relation, |
| false, executorErrors)) { |
| // call IUD Compaction. |
| IUDCommon.tryHorizontalCompaction(sqlContext, relation, isUpdateOperation = false) |
| } |
| } catch { |
| case e: HorizontalCompactionException => |
| LOG.error("Delete operation passed. Exception in Horizontal Compaction." + |
| " Please check logs. " + e.getMessage) |
| CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString) |
| |
| case e: Exception => |
| LOG.error("Exception in Delete data operation " + e.getMessage) |
| // ****** start clean up. |
| // In case of failure , clean all related delete delta files |
| CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) |
| |
| // clean up. Null check is required as for executor error some times message is null |
| if (null != e.getMessage) { |
| sys.error("Delete data operation is failed. " + e.getMessage) |
| } |
| else { |
| sys.error("Delete data operation is failed. Please check logs.") |
| } |
| } finally { |
| if (lockStatus) { |
| CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK) |
| } |
| } |
| Seq.empty |
| } |
| } |
| |
| private[sql] case class ProjectForUpdateCommand( |
| plan: LogicalPlan, tableIdentifier: Seq[String]) extends RunnableCommand { |
| val LOGGER = LogServiceFactory.getLogService(ProjectForUpdateCommand.getClass.getName) |
| |
| override def run(sqlContext: SQLContext): Seq[Row] = { |
| |
| val res = plan find { |
| case relation: LogicalRelation if (relation.relation |
| .isInstanceOf[CarbonDatasourceRelation]) => |
| true |
| case _ => false |
| } |
| |
| if (!res.isDefined) { |
| return Seq.empty |
| } |
| |
| val relation = CarbonEnv.get.carbonMetastore |
| .lookupRelation1(deleteExecution.getTableIdentifier(tableIdentifier))(sqlContext). |
| asInstanceOf[CarbonRelation] |
| val carbonTable = relation.tableMeta.carbonTable |
| val metadataLock = CarbonLockFactory |
| .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, |
| LockUsage.METADATA_LOCK) |
| var lockStatus = false |
| // get the current time stamp which should be same for delete and update. |
| val currentTime = CarbonUpdateUtil.readCurrentTime |
| var dataFrame: DataFrame = null |
| val isPersistEnabledUserValue = CarbonProperties.getInstance |
| .getProperty(CarbonCommonConstants.isPersistEnabled, |
| CarbonCommonConstants.defaultValueIsPersistEnabled) |
| var isPersistEnabled = CarbonCommonConstants.defaultValueIsPersistEnabled.toBoolean |
| if (isPersistEnabledUserValue.equalsIgnoreCase("false")) { |
| isPersistEnabled = false |
| } |
| else if (isPersistEnabledUserValue.equalsIgnoreCase("true")) { |
| isPersistEnabled = true |
| } |
| try { |
| lockStatus = metadataLock.lockWithRetries() |
| if (lockStatus) { |
| logInfo("Successfully able to get the table metadata file lock") |
| } |
| else { |
| throw new Exception("Table is locked for updation. Please try after some time") |
| } |
| val tablePath = CarbonStorePath.getCarbonTablePath( |
| carbonTable.getStorePath, |
| carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier) |
| // Get RDD. |
| dataFrame = if (isPersistEnabled) { |
| DataFrame(sqlContext, plan) |
| .persist(StorageLevel.MEMORY_AND_DISK) |
| } |
| else { |
| DataFrame(sqlContext, plan) |
| } |
| var executionErrors = new ExecutionErrors(FailureCauses.NONE, "") |
| |
| |
| // handle the clean up of IUD. |
| CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false) |
| |
| // do delete operation. |
| deleteExecution.deleteDeltaExecution(tableIdentifier, sqlContext, dataFrame.rdd, |
| currentTime + "", |
| relation, isUpdateOperation = true, executionErrors) |
| |
| if(executionErrors.failureCauses != FailureCauses.NONE) { |
| throw new Exception(executionErrors.errorMsg) |
| } |
| |
| // do update operation. |
| UpdateExecution.performUpdate(dataFrame, tableIdentifier, plan, |
| sqlContext, currentTime, executionErrors) |
| |
| if(executionErrors.failureCauses != FailureCauses.NONE) { |
| throw new Exception(executionErrors.errorMsg) |
| } |
| |
| // Do IUD Compaction. |
| IUDCommon.tryHorizontalCompaction(sqlContext, relation, isUpdateOperation = true) |
| } |
| |
| catch { |
| case e: HorizontalCompactionException => |
| LOGGER.error( |
| "Update operation passed. Exception in Horizontal Compaction. Please check logs." + e) |
| // In case of failure , clean all related delta files |
| CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString) |
| |
| case e: Exception => |
| LOGGER.error("Exception in update operation" + e) |
| // ****** start clean up. |
| // In case of failure , clean all related delete delta files |
| CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, currentTime + "") |
| |
| // *****end clean up. |
| if (null != e.getMessage) { |
| sys.error("Update operation failed. " + e.getMessage) |
| } |
| if (null != e.getCause && null != e.getCause.getMessage) { |
| sys.error("Update operation failed. " + e.getCause.getMessage) |
| } |
| sys.error("Update operation failed. please check logs.") |
| } |
| finally { |
| if (null != dataFrame && isPersistEnabled) { |
| dataFrame.unpersist() |
| } |
| if (lockStatus) { |
| CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK) |
| } |
| } |
| Seq.empty |
| } |
| } |
| |
| object IUDCommon { |
| |
| val LOG = LogServiceFactory.getLogService(this.getClass.getName) |
| |
| /** |
| * The method does horizontal compaction. After Update and Delete completion |
| * tryHorizontal compaction will be called. In case this method is called after |
| * Update statement then Update Compaction followed by Delete Compaction will be |
| * processed whereas for tryHorizontalCompaction called after Delete statement |
| * then only Delete Compaction will be processed. |
| * |
| * @param sqlContext |
| * @param carbonRelation |
| * @param isUpdateOperation |
| */ |
| def tryHorizontalCompaction(sqlContext: SQLContext, |
| carbonRelation: CarbonRelation, |
| isUpdateOperation: Boolean): Unit = { |
| |
| var ishorizontalCompaction = CarbonDataMergerUtil.isHorizontalCompactionEnabled() |
| |
| if (ishorizontalCompaction == false) { |
| return |
| } |
| |
| var compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION |
| val carbonTable = carbonRelation.tableMeta.carbonTable |
| val (db, table) = (carbonTable.getDatabaseName, carbonTable.getFactTableName) |
| val absTableIdentifier = carbonTable.getAbsoluteTableIdentifier |
| val updateTimeStamp = System.currentTimeMillis() |
| // To make sure that update and delete timestamps are not same, |
| // required to commit to status metadata and cleanup |
| val deleteTimeStamp = updateTimeStamp + 1 |
| |
| // get the valid segments |
| var segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier) |
| |
| if (segLists == null || segLists.size() == 0) { |
| return |
| } |
| |
| // Should avoid reading Table Status file from Disk every time. Better to load it |
| // in-memory at the starting and pass it along the routines. The constructor of |
| // SegmentUpdateStatusManager reads the Table Status File and Table Update Status |
| // file and save the content in segmentDetails and updateDetails respectively. |
| val segmentUpdateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager( |
| absTableIdentifier) |
| |
| if (isUpdateOperation == true) { |
| |
| // This is only update operation, perform only update compaction. |
| compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION |
| performUpdateDeltaCompaction(sqlContext, |
| compactionTypeIUD, |
| carbonTable, |
| absTableIdentifier, |
| segmentUpdateStatusManager, |
| updateTimeStamp, |
| segLists) |
| } |
| |
| // After Update Compaction perform delete compaction |
| compactionTypeIUD = CompactionType.IUD_DELETE_DELTA_COMPACTION |
| segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier) |
| if (segLists == null || segLists.size() == 0) { |
| return |
| } |
| |
| // Delete Compaction |
| performDeleteDeltaCompaction(sqlContext, |
| compactionTypeIUD, |
| carbonTable, |
| absTableIdentifier, |
| segmentUpdateStatusManager, |
| deleteTimeStamp, |
| segLists) |
| } |
| |
| /** |
| * Update Delta Horizontal Compaction. |
| * |
| * @param sqlContext |
| * @param compactionTypeIUD |
| * @param carbonTable |
| * @param absTableIdentifier |
| * @param segLists |
| */ |
| private def performUpdateDeltaCompaction(sqlContext: SQLContext, |
| compactionTypeIUD: CompactionType, |
| carbonTable: CarbonTable, |
| absTableIdentifier: AbsoluteTableIdentifier, |
| segmentUpdateStatusManager: SegmentUpdateStatusManager, |
| factTimeStamp: Long, |
| segLists: util.List[String]): Unit = { |
| val db = carbonTable.getDatabaseName |
| val table = carbonTable.getFactTableName |
| // get the valid segments qualified for update compaction. |
| val validSegList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists, |
| absTableIdentifier, |
| segmentUpdateStatusManager, |
| compactionTypeIUD) |
| |
| if (validSegList.size() == 0) { |
| return |
| } |
| |
| LOG.info(s"Horizontal Update Compaction operation started for [${db}.${table}].") |
| LOG.audit(s"Horizontal Update Compaction operation started for [${db}.${table}].") |
| |
| try { |
| // Update Compaction. |
| val altertablemodel = AlterTableModel(Option(carbonTable.getDatabaseName), |
| carbonTable.getFactTableName, |
| Some(segmentUpdateStatusManager), |
| CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString, |
| Some(factTimeStamp), |
| "") |
| |
| AlterTableCompaction(altertablemodel).run(sqlContext) |
| } |
| catch { |
| case e: Exception => |
| val msg = if (null != e.getMessage) { |
| e.getMessage |
| } else { |
| "Please check logs for more info" |
| } |
| throw new HorizontalCompactionException( |
| s"Horizontal Update Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp) |
| } |
| LOG.info(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].") |
| LOG.audit(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].") |
| } |
| |
| /** |
| * Delete Delta Horizontal Compaction. |
| * |
| * @param sqlContext |
| * @param compactionTypeIUD |
| * @param carbonTable |
| * @param absTableIdentifier |
| * @param segLists |
| */ |
| private def performDeleteDeltaCompaction(sqlContext: SQLContext, |
| compactionTypeIUD: CompactionType, |
| carbonTable: CarbonTable, |
| absTableIdentifier: AbsoluteTableIdentifier, |
| segmentUpdateStatusManager: SegmentUpdateStatusManager, |
| factTimeStamp: Long, |
| segLists: util.List[String]): Unit = { |
| |
| val db = carbonTable.getDatabaseName |
| val table = carbonTable.getFactTableName |
| val deletedBlocksList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists, |
| absTableIdentifier, |
| segmentUpdateStatusManager, |
| compactionTypeIUD) |
| |
| if (deletedBlocksList.size() == 0) { |
| return |
| } |
| |
| LOG.info(s"Horizontal Delete Compaction operation started for [${db}.${table}].") |
| LOG.audit(s"Horizontal Delete Compaction operation started for [${db}.${table}].") |
| |
| try { |
| |
| // Delete Compaction RDD |
| val rdd1 = sqlContext.sparkContext |
| .parallelize(deletedBlocksList.asScala.toSeq, deletedBlocksList.size()) |
| |
| val timestamp = factTimeStamp |
| val updateStatusDetails = segmentUpdateStatusManager.getUpdateStatusDetails |
| val result = rdd1.mapPartitions(iter => |
| new Iterator[Seq[CarbonDataMergerUtilResult]] { |
| override def hasNext: Boolean = iter.hasNext |
| |
| override def next(): Seq[CarbonDataMergerUtilResult] = { |
| val segmentAndBlocks = iter.next |
| val segment = segmentAndBlocks.substring(0, segmentAndBlocks.lastIndexOf("/")) |
| val blockName = segmentAndBlocks |
| .substring(segmentAndBlocks.lastIndexOf("/") + 1, segmentAndBlocks.length) |
| |
| val result = CarbonDataMergerUtil.compactBlockDeleteDeltaFiles(segment, blockName, |
| absTableIdentifier, |
| updateStatusDetails, |
| timestamp) |
| |
| result.asScala.toList |
| |
| } |
| }).collect |
| |
| val resultList = ListBuffer[CarbonDataMergerUtilResult]() |
| result.foreach(x => { |
| x.foreach(y => { |
| resultList += y |
| }) |
| }) |
| |
| val updateStatus = CarbonDataMergerUtil.updateStatusFile(resultList.toList.asJava, |
| carbonTable, |
| timestamp.toString, |
| segmentUpdateStatusManager) |
| if (updateStatus == false) { |
| LOG.audit(s"Delete Compaction data operation is failed for [${db}.${table}].") |
| LOG.error("Delete Compaction data operation is failed.") |
| throw new HorizontalCompactionException( |
| s"Horizontal Delete Compaction Failed for [${db}.${table}] ." + |
| s" Please check logs for more info.", factTimeStamp) |
| } |
| else { |
| LOG.info(s"Horizontal Delete Compaction operation completed for [${db}.${table}].") |
| LOG.audit(s"Horizontal Delete Compaction operation completed for [${db}.${table}].") |
| } |
| } |
| catch { |
| case e: Exception => |
| val msg = if (null != e.getMessage) { |
| e.getMessage |
| } else { |
| "Please check logs for more info" |
| } |
| throw new HorizontalCompactionException( |
| s"Horizontal Delete Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp) |
| } |
| } |
| } |
| |
| class HorizontalCompactionException( |
| message: String, |
| // required for cleanup |
| val compactionTimeStamp: Long) extends RuntimeException(message) { |
| } |
| |
| object deleteExecution { |
| val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) |
| |
| def getTableIdentifier(tableIdentifier: Seq[String]): TableIdentifier = { |
| if (tableIdentifier.size > 1) { |
| TableIdentifier(tableIdentifier(1), Some(tableIdentifier(0))) |
| } else { |
| TableIdentifier(tableIdentifier(0), None) |
| } |
| } |
| |
| def deleteDeltaExecution(identifier: Seq[String], |
| sqlContext: SQLContext, |
| dataRdd: RDD[Row], |
| timestamp: String, relation: CarbonRelation, isUpdateOperation: Boolean, |
| executorErrors: ExecutionErrors): Boolean = { |
| |
| var res: Array[List[(String, (SegmentUpdateDetails, ExecutionErrors))]] = null |
| val tableName = getTableIdentifier(identifier).table |
| val database = getDB.getDatabaseName(getTableIdentifier(identifier).database, sqlContext) |
| val relation = CarbonEnv.get.carbonMetastore |
| .lookupRelation1(getTableIdentifier(identifier))(sqlContext). |
| asInstanceOf[CarbonRelation] |
| |
| val storeLocation = relation.tableMeta.storePath |
| val absoluteTableIdentifier: AbsoluteTableIdentifier = new |
| AbsoluteTableIdentifier(storeLocation, |
| relation.tableMeta.carbonTableIdentifier) |
| var tablePath = CarbonStorePath |
| .getCarbonTablePath(storeLocation, |
| absoluteTableIdentifier.getCarbonTableIdentifier()) |
| var tableUpdateStatusPath = tablePath.getTableUpdateStatusFilePath |
| val totalSegments = |
| SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath).length |
| var factPath = tablePath.getFactDir |
| |
| var carbonTable = relation.tableMeta.carbonTable |
| var deleteStatus = true |
| val deleteRdd = if (isUpdateOperation) { |
| val schema = |
| org.apache.spark.sql.types.StructType(Seq(org.apache.spark.sql.types.StructField( |
| CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, |
| org.apache.spark.sql.types.StringType))) |
| val rdd = dataRdd |
| .map(row => Row(row.get(row.fieldIndex( |
| CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)))) |
| sqlContext.createDataFrame(rdd, schema).rdd |
| } else { |
| dataRdd |
| } |
| |
| val (carbonInputFormat, job) = |
| QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier) |
| |
| val keyRdd = deleteRdd.map({ row => |
| val tupleId: String = row |
| .getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) |
| val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId) |
| (key, row) |
| }).groupByKey() |
| |
| // if no loads are present then no need to do anything. |
| if (keyRdd.partitions.size == 0) { |
| return true |
| } |
| |
| var blockMappingVO = carbonInputFormat.getBlockRowCount(job, absoluteTableIdentifier) |
| val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(absoluteTableIdentifier) |
| CarbonUpdateUtil |
| .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr) |
| |
| val rowContRdd = sqlContext.sparkContext |
| .parallelize(blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq, |
| keyRdd.partitions.size) |
| |
| val rdd = rowContRdd.join(keyRdd) |
| |
| res = rdd.mapPartitionsWithIndex( |
| (index: Int, records: Iterator[((String), (RowCountDetailsVO, Iterable[Row]))]) => |
| Iterator[List[(String, (SegmentUpdateDetails, ExecutionErrors))]] { |
| |
| var result = List[(String, (SegmentUpdateDetails, ExecutionErrors))]() |
| while (records.hasNext) { |
| val ((key), (rowCountDetailsVO, groupedRows)) = records.next |
| result = result ++ |
| deleteDeltaFunc(index, |
| key, |
| groupedRows.toIterator, |
| timestamp, |
| rowCountDetailsVO) |
| |
| } |
| result |
| } |
| ).collect() |
| |
| // if no loads are present then no need to do anything. |
| if (res.isEmpty) { |
| return true |
| } |
| |
| // update new status file |
| checkAndUpdateStatusFiles |
| |
| // all or none : update status file, only if complete delete opeartion is successfull. |
| def checkAndUpdateStatusFiles: Unit = { |
| val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]() |
| val segmentDetails = new util.HashSet[String]() |
| res.foreach(resultOfSeg => resultOfSeg.foreach( |
| resultOfBlock => { |
| if (resultOfBlock._1.equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)) { |
| blockUpdateDetailsList.add(resultOfBlock._2._1) |
| segmentDetails.add(resultOfBlock._2._1.getSegmentName) |
| // if this block is invalid then decrement block count in map. |
| if (CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getStatus)) { |
| CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1, |
| blockMappingVO.getSegmentNumberOfBlockMapping) |
| } |
| } |
| else { |
| deleteStatus = false |
| // In case of failure , clean all related delete delta files |
| CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) |
| LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }") |
| val errorMsg = |
| "Delete data operation is failed due to failure in creating delete delta file for " + |
| "segment : " + resultOfBlock._2._1.getSegmentName + " block : " + |
| resultOfBlock._2._1.getBlockName |
| executorErrors.failureCauses = resultOfBlock._2._2.failureCauses |
| executorErrors.errorMsg = resultOfBlock._2._2.errorMsg |
| |
| if (executorErrors.failureCauses == FailureCauses.NONE) { |
| executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE |
| executorErrors.errorMsg = errorMsg |
| } |
| LOGGER.error(errorMsg) |
| return |
| } |
| } |
| ) |
| ) |
| |
| val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil |
| .getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping) |
| |
| |
| |
| // this is delete flow so no need of putting timestamp in the status file. |
| if (CarbonUpdateUtil |
| .updateSegmentStatus(blockUpdateDetailsList, carbonTable, timestamp, false) && |
| CarbonUpdateUtil |
| .updateTableMetadataStatus(segmentDetails, |
| carbonTable, |
| timestamp, |
| !isUpdateOperation, |
| listOfSegmentToBeMarkedDeleted) |
| ) { |
| LOGGER.info(s"Delete data operation is successful for ${ database }.${ tableName }") |
| LOGGER.audit(s"Delete data operation is successful for ${ database }.${ tableName }") |
| } |
| else { |
| // In case of failure , clean all related delete delta files |
| CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) |
| |
| val errorMessage = "Delete data operation is failed due to failure " + |
| "in table status updation." |
| LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }") |
| LOGGER.error("Delete data operation is failed due to failure in table status updation.") |
| executorErrors.failureCauses = FailureCauses.STATUS_FILE_UPDATION_FAILURE |
| executorErrors.errorMsg = errorMessage |
| // throw new Exception(errorMessage) |
| } |
| } |
| |
| def deleteDeltaFunc(index: Int, |
| key: String, |
| iter: Iterator[Row], |
| timestamp: String, |
| rowCountDetailsVO: RowCountDetailsVO): |
| Iterator[(String, (SegmentUpdateDetails, ExecutionErrors))] = { |
| |
| val result = new DeleteDelataResultImpl() |
| var deleteStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE |
| val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) |
| // here key = segment/blockName |
| val blockName = CarbonUpdateUtil |
| .getBlockName( |
| CarbonTablePath.addDataPartPrefix(key.split(CarbonCommonConstants.FILE_SEPARATOR)(1))) |
| val segmentId = key.split(CarbonCommonConstants.FILE_SEPARATOR)(0) |
| var deleteDeltaBlockDetails: DeleteDeltaBlockDetails = new DeleteDeltaBlockDetails(blockName) |
| val resultIter = new Iterator[(String, (SegmentUpdateDetails, ExecutionErrors))] { |
| val segmentUpdateDetails = new SegmentUpdateDetails() |
| var TID = "" |
| var countOfRows = 0 |
| try { |
| while (iter.hasNext) { |
| val oneRow = iter.next |
| TID = oneRow |
| .get(oneRow.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).toString |
| val offset = CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.OFFSET) |
| val blockletId = CarbonUpdateUtil |
| .getRequiredFieldFromTID(TID, TupleIdEnum.BLOCKLET_ID) |
| val pageId = Integer.parseInt(CarbonUpdateUtil |
| .getRequiredFieldFromTID(TID, TupleIdEnum.PAGE_ID)) |
| val IsValidOffset = deleteDeltaBlockDetails.addBlocklet(blockletId, offset, pageId) |
| // stop delete operation |
| if(!IsValidOffset) { |
| executorErrors.failureCauses = FailureCauses.MULTIPLE_INPUT_ROWS_MATCHING |
| executorErrors.errorMsg = "Multiple input rows matched for same row." |
| throw new MultipleMatchingException("Multiple input rows matched for same row.") |
| } |
| countOfRows = countOfRows + 1 |
| } |
| |
| val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, factPath) |
| val completeBlockName = CarbonTablePath |
| .addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCK_ID) + |
| CarbonCommonConstants.FACT_FILE_EXT) |
| val deleteDeletaPath = CarbonUpdateUtil |
| .getDeleteDeltaFilePath(blockPath, blockName, timestamp) |
| val carbonDeleteWriter = new CarbonDeleteDeltaWriterImpl(deleteDeletaPath, |
| FileFactory.getFileType(deleteDeletaPath)) |
| |
| |
| |
| segmentUpdateDetails.setBlockName(blockName) |
| segmentUpdateDetails.setActualBlockName(completeBlockName) |
| segmentUpdateDetails.setSegmentName(segmentId) |
| segmentUpdateDetails.setDeleteDeltaEndTimestamp(timestamp) |
| segmentUpdateDetails.setDeleteDeltaStartTimestamp(timestamp) |
| |
| val alreadyDeletedRows: Long = rowCountDetailsVO.getDeletedRowsInBlock |
| val totalDeletedRows: Long = alreadyDeletedRows + countOfRows |
| segmentUpdateDetails.setDeletedRowsInBlock(totalDeletedRows.toString) |
| if (totalDeletedRows == rowCountDetailsVO.getTotalNumberOfRows) { |
| segmentUpdateDetails.setStatus(CarbonCommonConstants.MARKED_FOR_DELETE) |
| } |
| else { |
| // write the delta file |
| carbonDeleteWriter.write(deleteDeltaBlockDetails) |
| } |
| |
| deleteStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS |
| } catch { |
| case e : MultipleMatchingException => |
| LOGGER.audit(e.getMessage) |
| LOGGER.error(e.getMessage) |
| // dont throw exception here. |
| case e: Exception => |
| val errorMsg = s"Delete data operation is failed for ${ database }.${ tableName }." |
| LOGGER.audit(errorMsg) |
| LOGGER.error(errorMsg + e.getMessage) |
| throw e |
| } |
| |
| |
| var finished = false |
| |
| override def hasNext: Boolean = { |
| if (!finished) { |
| finished = true |
| finished |
| } |
| else { |
| !finished |
| } |
| } |
| |
| override def next(): (String, (SegmentUpdateDetails, ExecutionErrors)) = { |
| finished = true |
| result.getKey(deleteStatus, (segmentUpdateDetails, executorErrors)) |
| } |
| } |
| resultIter |
| } |
| true |
| } |
| } |
| |
| |
| |
| object UpdateExecution { |
| |
| def performUpdate( |
| dataFrame: DataFrame, |
| tableIdentifier: Seq[String], |
| plan: LogicalPlan, |
| sqlContext: SQLContext, |
| currentTime: Long, |
| executorErrors: ExecutionErrors): Unit = { |
| |
| def isDestinationRelation(relation: CarbonDatasourceRelation): Boolean = { |
| |
| val tableName = relation.getTable() |
| val dbName = relation.getDatabaseName() |
| (tableIdentifier.size > 1 && |
| tableIdentifier(0) == dbName && |
| tableIdentifier(1) == tableName) || |
| (tableIdentifier(0) == tableName) |
| } |
| def getHeader(relation: CarbonDatasourceRelation, plan: LogicalPlan): String = { |
| var header = "" |
| var found = false |
| |
| plan match { |
| case Project(pList, _) if (!found) => |
| found = true |
| header = pList |
| .filter(field => !field.name |
| .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) |
| .map(col => if (col.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION)) { |
| col.name |
| .substring(0, col.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)) |
| } |
| else { |
| col.name |
| }).mkString(",") |
| } |
| header |
| } |
| val ex = dataFrame.queryExecution.analyzed |
| val res = ex find { |
| case relation: LogicalRelation if (relation.relation.isInstanceOf[CarbonDatasourceRelation] && |
| isDestinationRelation(relation.relation |
| .asInstanceOf[CarbonDatasourceRelation])) => |
| true |
| case _ => false |
| } |
| val carbonRelation: CarbonDatasourceRelation = res match { |
| case Some(relation: LogicalRelation) => |
| relation.relation.asInstanceOf[CarbonDatasourceRelation] |
| case _ => sys.error("") |
| } |
| |
| val updateTableModel = UpdateTableModel(true, currentTime, executorErrors) |
| |
| val header = getHeader(carbonRelation, plan) |
| |
| |
| |
| LoadTable( |
| Some(carbonRelation.getDatabaseName()), |
| carbonRelation.getTable(), |
| null, |
| Seq(), |
| Map(("fileheader" -> header)), |
| false, |
| null, |
| Some(dataFrame), |
| Some(updateTableModel)).run(sqlContext) |
| |
| |
| executorErrors.errorMsg = updateTableModel.executorErrors.errorMsg |
| executorErrors.failureCauses = updateTableModel.executorErrors.failureCauses |
| |
| Seq.empty |
| |
| } |
| |
| } |