| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.spark.sql.execution.command.mutation.merge |
| |
| import java.util |
| import java.util.UUID |
| |
| import scala.collection.JavaConverters._ |
| import scala.collection.mutable.ArrayBuffer |
| |
| import org.apache.hadoop.fs.Path |
| import org.apache.hadoop.mapreduce.{Job, JobID, TaskAttemptID, TaskID, TaskType} |
| import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat |
| import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl |
| import org.apache.spark.rdd.RDD |
| import org.apache.spark.sql.{AnalysisException, CarbonUtils, Column, DataFrame, Dataset, Row, SparkSession} |
| import org.apache.spark.sql.avro.AvroFileFormatFactory |
| import org.apache.spark.sql.catalyst.InternalRow |
| import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute |
| import org.apache.spark.sql.catalyst.expressions.{Attribute, EqualTo, Expression, GenericInternalRow, GenericRowWithSchema} |
| import org.apache.spark.sql.execution.LogicalRDD |
| import org.apache.spark.sql.execution.command.{DataCommand, ExecutionErrors, UpdateTableModel} |
| import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand |
| import org.apache.spark.sql.execution.command.mutation.HorizontalCompaction |
| import org.apache.spark.sql.functions._ |
| import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} |
| import org.apache.spark.sql.util.SparkSQLUtil |
| import org.apache.spark.unsafe.types.UTF8String |
| import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, LongAccumulator} |
| |
| import org.apache.carbondata.common.logging.LogServiceFactory |
| import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} |
| import org.apache.carbondata.core.datastore.impl.FileFactory |
| import org.apache.carbondata.core.index.Segment |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable |
| import org.apache.carbondata.core.mutate.CarbonUpdateUtil |
| import org.apache.carbondata.core.statusmanager.SegmentStatusManager |
| import org.apache.carbondata.core.util.CarbonProperties |
| import org.apache.carbondata.core.util.path.CarbonTablePath |
| import org.apache.carbondata.events.OperationContext |
| import org.apache.carbondata.processing.loading.FailureCauses |
| import org.apache.carbondata.spark.util.CarbonSparkUtil |
| |
| /** |
| * This command will merge the data of source dataset to target dataset backed by carbon table. |
| * @param targetDsOri Target dataset to merge the data. It should be backed by carbon table |
| * @param srcDS Source dataset, it can be any data. |
| * @param mergeMatches It contains the join condition and list match conditions to apply. |
| */ |
| case class CarbonMergeDataSetCommand( |
| targetDsOri: Dataset[Row], |
| srcDS: Dataset[Row], |
| var mergeMatches: MergeDataSetMatches) |
| extends DataCommand { |
| |
| val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) |
| |
| private val status_on_mergeds = "status_on_mergeds" |
| |
| /** |
| * It merge the data of source dataset to target dataset backed by carbon table. Basically it |
| * makes the full outer join with both source and target and apply the given conditions as "case |
| * when" to get the status to process the row. The status could be insert/update/delete. |
| * It also can insert the history(update/delete) data to history table. |
| * |
| */ |
| override def processData(sparkSession: SparkSession): Seq[Row] = { |
| val relations = CarbonUtils.collectCarbonRelation(targetDsOri.logicalPlan) |
| // Target dataset must be backed by carbondata table. |
| if (relations.length != 1) { |
| throw new UnsupportedOperationException( |
| "Carbon table supposed to be present in merge dataset") |
| } |
| // validate the merge matches and actions. |
| validateMergeActions(mergeMatches, targetDsOri, sparkSession) |
| val carbonTable = relations.head.carbonRelation.carbonTable |
| val hasDelAction = mergeMatches.matchList |
| .exists(_.getActions.exists(_.isInstanceOf[DeleteAction])) |
| val hasUpdateAction = mergeMatches.matchList |
| .exists(_.getActions.exists(_.isInstanceOf[UpdateAction])) |
| val (insertHistOfUpdate, insertHistOfDelete) = getInsertHistoryStatus(mergeMatches) |
| // Get all the required columns of targetDS by going through all match conditions and actions. |
| val columns = getSelectExpressionsOnExistingDF(targetDsOri, mergeMatches, sparkSession) |
| // select only the required columns, it can avoid lot of and shuffling. |
| val targetDs = targetDsOri.select(columns: _*) |
| // Update the update mapping with unfilled columns.From here on system assumes all mappings |
| // are existed. |
| mergeMatches = updateMappingIfNotExists(mergeMatches, targetDs) |
| // Lets generate all conditions combinations as one column and add them as 'status'. |
| val condition = generateStatusColumnWithAllCombinations(mergeMatches) |
| |
| // decide join type based on match conditions |
| val joinType = decideJoinType |
| |
| val joinColumn = mergeMatches.joinExpr.expr.asInstanceOf[EqualTo].left |
| .asInstanceOf[UnresolvedAttribute].nameParts.tail.head |
| // repartition the srsDs, if the target has bucketing and the bucketing column and join column |
| // are same |
| val repartitionedSrcDs = |
| if (carbonTable.getBucketingInfo != null && |
| carbonTable.getBucketingInfo |
| .getListOfColumns |
| .asScala |
| .exists(_.getColumnName.equalsIgnoreCase(joinColumn))) { |
| srcDS.repartition(carbonTable.getBucketingInfo.getNumOfRanges, srcDS.col(joinColumn)) |
| } else { |
| srcDS |
| } |
| // Add the getTupleId() udf to get the tuple id to generate delete delta. |
| val frame = |
| targetDs |
| .withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, expr("getTupleId()")) |
| .withColumn("exist_on_target", lit(1)) |
| .join(repartitionedSrcDs.withColumn("exist_on_src", lit(1)), |
| mergeMatches.joinExpr, |
| joinType) |
| .withColumn(status_on_mergeds, condition) |
| if (LOGGER.isDebugEnabled) { |
| frame.explain() |
| } |
| val tableCols = |
| carbonTable.getCreateOrderColumn.asScala.map(_.getColName). |
| filterNot(_.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)) |
| val header = tableCols.mkString(",") |
| |
| val projections: Seq[Seq[MergeProjection]] = mergeMatches.matchList.map { m => |
| m.getActions.map { |
| case u: UpdateAction => MergeProjection(tableCols, |
| status_on_mergeds, |
| frame, |
| relations.head, |
| sparkSession, |
| u) |
| case i: InsertAction => MergeProjection(tableCols, |
| status_on_mergeds, |
| frame, |
| relations.head, |
| sparkSession, |
| i) |
| case d: DeleteAction => MergeProjection(tableCols, |
| status_on_mergeds, |
| frame, |
| relations.head, |
| sparkSession, |
| d) |
| case _ => null |
| }.filter(_ != null) |
| } |
| |
| val st = System.currentTimeMillis() |
| // Create accumulators to log the stats |
| val stats = Stats(createLongAccumulator("insertedRows"), |
| createLongAccumulator("updatedRows"), |
| createLongAccumulator("deletedRows")) |
| val targetSchema = StructType(tableCols.map { f => |
| relations.head.carbonRelation.schema.find(_.name.equalsIgnoreCase(f)).get |
| } ++ Seq(StructField(status_on_mergeds, IntegerType))) |
| val (processedRDD, deltaPath) = processIUD(sparkSession, frame, carbonTable, projections, |
| targetSchema, stats) |
| |
| val executorErrors = ExecutionErrors(FailureCauses.NONE, "") |
| val trxMgr = TranxManager(System.currentTimeMillis()) |
| |
| val mutationAction = MutationActionFactory.getMutationAction(sparkSession, |
| carbonTable, hasDelAction, hasUpdateAction, |
| insertHistOfUpdate, insertHistOfDelete) |
| |
| val loadDF = Dataset.ofRows(sparkSession, |
| LogicalRDD(targetSchema.toAttributes, |
| processedRDD)(sparkSession)) |
| |
| loadDF.cache() |
| val count = loadDF.count() |
| val updateTableModel = if (FileFactory.isFileExist(deltaPath)) { |
| val deltaRdd = AvroFileFormatFactory.readAvro(sparkSession, deltaPath) |
| val tuple = mutationAction.handleAction(deltaRdd, executorErrors, trxMgr) |
| FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(deltaPath)) |
| if (!CarbonUpdateUtil.updateSegmentStatus(tuple._1.asScala.asJava, |
| carbonTable, |
| trxMgr.getLatestTrx.toString, false)) { |
| LOGGER.error("writing of update status file failed") |
| throw new CarbonMergeDataSetException("writing of update status file failed") |
| } |
| if (carbonTable.isHivePartitionTable) { |
| // If load count is 0 and if merge action contains delete operation, update |
| // tableUpdateStatus file name in loadMeta entry |
| if (count == 0 && hasDelAction && !tuple._1.isEmpty) { |
| val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(CarbonTablePath |
| .getTableStatusFilePath(carbonTable.getTablePath)) |
| CarbonUpdateUtil.updateTableMetadataStatus(loadMetaDataDetails.map(loadMetadataDetail => |
| new Segment(loadMetadataDetail.getMergedLoadName, |
| loadMetadataDetail.getSegmentFile)).toSet.asJava, |
| carbonTable, |
| trxMgr.getLatestTrx.toString, |
| true, |
| tuple._2.asJava) |
| } |
| } |
| Some(UpdateTableModel(isUpdate = true, trxMgr.getLatestTrx, |
| executorErrors, tuple._2, loadAsNewSegment = true)) |
| } else { |
| None |
| } |
| |
| val dataFrame = loadDF.select(tableCols.map(col): _*) |
| CarbonInsertIntoCommand(databaseNameOp = Some(carbonTable.getDatabaseName), |
| tableName = carbonTable.getTableName, |
| options = Map("fileheader" -> header, "sort_scope" -> "no_sort"), |
| isOverwriteTable = false, |
| dataFrame.queryExecution.logical, |
| carbonTable.getTableInfo, |
| Map.empty, |
| Map.empty, |
| new OperationContext, |
| updateTableModel |
| ).run(sparkSession) |
| LOGGER.info(s"Total inserted rows: ${stats.insertedRows.sum}") |
| LOGGER.info(s"Total updated rows: ${stats.updatedRows.sum}") |
| LOGGER.info(s"Total deleted rows: ${stats.deletedRows.sum}") |
| LOGGER.info( |
| " Time taken to merge data :: " + (System.currentTimeMillis() - st)) |
| |
| // Load the history table if the insert history table action is added by user. |
| HistoryTableLoadHelper.loadHistoryTable(sparkSession, relations.head, carbonTable, |
| trxMgr, mutationAction, mergeMatches) |
| // Do IUD Compaction. |
| HorizontalCompaction.tryHorizontalCompaction( |
| sparkSession, carbonTable, isUpdateOperation = false) |
| Seq.empty |
| } |
| |
| // Decide join type based on match conditions |
| private def decideJoinType: String = { |
| if (containsWhenNotMatchedOnly) { |
| // if match condition contains WhenNotMatched only, then we do not need |
| // left table key and matched key |
| "right_outer" |
| } else if (containsWhenMatchedOnly) { |
| // if match condition contains WhenMatched only, then we need matched key only |
| "inner" |
| } else if (needKeyFromLeftTable) { |
| // if we need to keep keys from left table, then use full outer join |
| "full_outer" |
| } else { |
| // default join type |
| "right" |
| } |
| } |
| |
| private def needKeyFromLeftTable: Boolean = { |
| mergeMatches.matchList.exists(_.isInstanceOf[WhenNotMatchedAndExistsOnlyOnTarget]) |
| } |
| |
| private def containsWhenMatchedOnly: Boolean = { |
| mergeMatches.matchList.forall(_.isInstanceOf[WhenMatched]) |
| } |
| |
| private def containsWhenNotMatchedOnly: Boolean = { |
| mergeMatches.matchList.forall(_.isInstanceOf[WhenNotMatched]) |
| } |
| |
| /** |
| * As per the status of the row either it inserts the data or update/delete the data. |
| */ |
| private def processIUD(sparkSession: SparkSession, |
| frame: DataFrame, |
| carbonTable: CarbonTable, |
| projections: Seq[Seq[MergeProjection]], |
| targetSchema: StructType, |
| stats: Stats): (RDD[InternalRow], String) = { |
| val frameCols = frame.queryExecution.analyzed.output |
| val status = frameCols.length - 1 |
| val tupleId = frameCols.zipWithIndex |
| .find(_._1.name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).get._2 |
| val insertedRows = stats.insertedRows |
| val updatedRows = stats.updatedRows |
| val deletedRows = stats.deletedRows |
| val job = CarbonSparkUtil.createHadoopJob() |
| job.setOutputKeyClass(classOf[Void]) |
| job.setOutputValueClass(classOf[InternalRow]) |
| val uuid = UUID.randomUUID.toString |
| job.setJobID(new JobID(uuid, 0)) |
| val path = carbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + "avro" |
| FileOutputFormat.setOutputPath(job, new Path(path)) |
| val schema = |
| org.apache.spark.sql.types.StructType(Seq( |
| StructField(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, StringType), |
| StructField(status_on_mergeds, IntegerType))) |
| val factory = AvroFileFormatFactory.getAvroWriter(sparkSession, job, schema) |
| val config = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, job.getConfiguration) |
| (frame.rdd.mapPartitionsWithIndex { case (index, iter) => |
| val confB = config.value.value |
| val task = new TaskID(new JobID(uuid, 0), TaskType.MAP, index) |
| val attemptID = new TaskAttemptID(task, index) |
| val context = new TaskAttemptContextImpl(confB, attemptID) |
| val writer = factory.newInstance(path + CarbonCommonConstants.FILE_SEPARATOR + task.toString, |
| schema, context) |
| val projLen = projections.length |
| new Iterator[InternalRow] { |
| val queue = new util.LinkedList[InternalRow]() |
| override def hasNext: Boolean = if (!queue.isEmpty || iter.hasNext) true else { |
| writer.close() |
| false |
| } |
| |
| override def next(): InternalRow = { |
| if (!queue.isEmpty) return queue.poll() |
| val row = iter.next() |
| val rowWithSchema = row.asInstanceOf[GenericRowWithSchema] |
| val is = row.get(status) |
| var isUpdate = false |
| var isDelete = false |
| var insertedCount = 0 |
| if (is != null) { |
| val isInt = is.asInstanceOf[Int] |
| var i = 0 |
| while (i < projLen) { |
| if ((isInt & (1 << i)) == (1 << i)) projections(i).foreach { p => |
| if (!p.isDelete) { |
| if (p.isUpdate) isUpdate = p.isUpdate |
| queue.add(p(rowWithSchema)) |
| insertedCount += 1 |
| } else isDelete = true |
| } |
| i = i + 1 |
| } |
| } |
| val newArray = new Array[Any](2) |
| newArray(0) = UTF8String.fromString(row.getString(tupleId)) |
| if (isUpdate && isDelete) { |
| newArray(1) = 102 |
| writer.write(new GenericInternalRow(newArray)) |
| updatedRows.add(1) |
| deletedRows.add(1) |
| insertedCount -= 1 |
| } else if (isUpdate) { |
| updatedRows.add(1) |
| newArray(1) = 101 |
| insertedCount -= 1 |
| writer.write(new GenericInternalRow(newArray)) |
| } else if (isDelete) { |
| newArray(1) = 100 |
| deletedRows.add(1) |
| writer.write(new GenericInternalRow(newArray)) |
| } |
| insertedRows.add(insertedCount) |
| if (!queue.isEmpty) queue.poll() else { |
| val values = new Array[Any](targetSchema.length) |
| new GenericInternalRow(values) |
| } |
| } |
| } |
| }.filter { row => |
| val status = row.get(targetSchema.length-1, IntegerType) |
| status != null |
| }, path) |
| } |
| |
| private def createLongAccumulator(name: String) = { |
| val acc = new LongAccumulator |
| acc.setValue(0) |
| acc.metadata = AccumulatorMetadata(AccumulatorContext.newId(), Some(name), countFailedValues |
| = false) |
| AccumulatorContext.register(acc) |
| acc |
| } |
| |
| /** |
| * It generates conditions for all possible scenarios and add a integer number for each match. |
| * There could be scenarios that one row can match two conditions so it should apply the actions |
| * of both the matches to the row. |
| * For example : |
| * whenmathed(a=c1) |
| * update() |
| * whennotmatched(b=d1) |
| * insert() |
| * whennotmatched(b=d2) |
| * insert() |
| * |
| * The above merge statement will be converted to |
| * (case when a=c1 and b=d1 and b=d2 then 7 |
| * when a=c1 and b=d1 then 6 |
| * when a=c1 and b=d2 then 5 |
| * when a=c1 then 4 |
| * when b=d1 and b=d2 then 3 |
| * when b=d1 then 2 |
| * when b=d2 the 1) as status |
| * |
| * So it would not be recommended use so many merge conditions as it increase the case when |
| * statements exponentially. |
| * |
| * @param mergeMatches |
| * @return |
| */ |
| def generateStatusColumnWithAllCombinations(mergeMatches: MergeDataSetMatches): Column = { |
| var exprList = new ArrayBuffer[(Column, Int)]() |
| val matchList = mergeMatches.matchList |
| val len = matchList.length |
| val N = Math.pow(2d, len.toDouble).toInt |
| var i = 1 |
| while (i < N) { |
| var status = 0 |
| var column: Column = null |
| val code = Integer.toBinaryString(N | i).substring(1) |
| var j = 0 |
| while (j < len) { |
| if (code.charAt(j) == '1') { |
| val mergeMatch = matchList(j) |
| if (column == null) { |
| if (mergeMatch.getExp.isDefined) { |
| column = mergeMatch.getExp.get |
| } |
| } else { |
| if (mergeMatch.getExp.isDefined) { |
| column = column.and(mergeMatch.getExp.get) |
| } |
| } |
| mergeMatch match { |
| case wm: WhenMatched => |
| val existsOnBoth = col("exist_on_target").isNotNull.and( |
| col("exist_on_src").isNotNull) |
| column = if (column == null) { |
| existsOnBoth |
| } else { |
| column.and(existsOnBoth) |
| } |
| case wnm: WhenNotMatched => |
| val existsOnSrc = col("exist_on_target").isNull.and( |
| col("exist_on_src").isNotNull) |
| column = if (column == null) { |
| existsOnSrc |
| } else { |
| column.and(existsOnSrc) |
| } |
| case wnm: WhenNotMatchedAndExistsOnlyOnTarget => |
| val existsOnSrc = col("exist_on_target").isNotNull.and( |
| col("exist_on_src").isNull) |
| column = if (column == null) { |
| existsOnSrc |
| } else { |
| column.and(existsOnSrc) |
| } |
| case _ => |
| } |
| status = status | 1 << j |
| } |
| j += 1 |
| } |
| if (column == null) { |
| column = lit(true) === lit(true) |
| } |
| exprList += ((column, status)) |
| i += 1 |
| } |
| exprList = exprList.reverse |
| var condition: Column = null |
| exprList.foreach { case (col, status) => |
| if (condition == null) { |
| condition = when(col, lit(status)) |
| } else { |
| condition = condition.when(col, lit(status)) |
| } |
| } |
| condition.otherwise(lit(null)) |
| } |
| |
| private def getSelectExpressionsOnExistingDF(existingDs: Dataset[Row], |
| mergeMatches: MergeDataSetMatches, sparkSession: SparkSession): Seq[Column] = { |
| var projects = Seq.empty[Attribute] |
| val existAttrs = existingDs.queryExecution.analyzed.output |
| projects ++= selectAttributes(mergeMatches.joinExpr.expr, existingDs, sparkSession) |
| mergeMatches.matchList.foreach { m => |
| if (m.getExp.isDefined) { |
| projects ++= selectAttributes(m.getExp.get.expr, existingDs, sparkSession) |
| } |
| m.getActions.foreach { |
| case u: UpdateAction => |
| projects ++= existAttrs.filterNot { f => |
| u.updateMap.exists(_._1.toString().equalsIgnoreCase(f.name)) |
| } |
| case i: InsertAction => |
| if (!existAttrs.forall(f => i.insertMap |
| .exists(_._1.toString().equalsIgnoreCase(f.name)))) { |
| throw new CarbonMergeDataSetException( |
| "Not all source columns are mapped for insert action " + i.insertMap) |
| } |
| i.insertMap.foreach { case (k, v) => |
| projects ++= selectAttributes(v.expr, existingDs, sparkSession) |
| } |
| case _ => |
| } |
| } |
| projects.map(_.name.toLowerCase).distinct.map { p => |
| existingDs.col(p) |
| } |
| } |
| |
| private def updateMappingIfNotExists(mergeMatches: MergeDataSetMatches, |
| existingDs: Dataset[Row]): MergeDataSetMatches = { |
| val existAttrs = existingDs.queryExecution.analyzed.output |
| val updateCommand = mergeMatches.matchList.map { m => |
| val updateAction = m.getActions.map { |
| case u: UpdateAction => |
| if (u.updateMap.isEmpty) { |
| throw new CarbonMergeDataSetException( |
| "At least one column supposed to be updated for update action") |
| } |
| val attributes = existAttrs.filterNot { f => |
| u.updateMap.exists(_._1.toString().equalsIgnoreCase(f.name)) |
| } |
| val newMap = attributes.map(a => (existingDs.col(a.name), existingDs.col(a.name))).toMap |
| u.copy(u.updateMap ++ newMap) |
| case other => other |
| } |
| m.updateActions(updateAction) |
| } |
| mergeMatches.copy(matchList = |
| updateCommand.filterNot(_.getActions.exists(_.isInstanceOf[DeleteAction])) |
| ++ updateCommand.filter(_.getActions.exists(_.isInstanceOf[DeleteAction]))) |
| } |
| |
| private def selectAttributes(expression: Expression, existingDs: Dataset[Row], |
| sparkSession: SparkSession, throwError: Boolean = false) = { |
| expression.collect { |
| case a: Attribute => |
| val resolved = existingDs.queryExecution |
| .analyzed.resolveQuoted(a.name, sparkSession.sessionState.analyzer.resolver) |
| if (resolved.isDefined) { |
| resolved.get.toAttribute |
| } else if (throwError) { |
| throw new CarbonMergeDataSetException( |
| expression + " cannot be resolved with dataset " + existingDs) |
| } else { |
| null |
| } |
| }.filter(_ != null) |
| } |
| |
| private def getInsertHistoryStatus(mergeMatches: MergeDataSetMatches): (Boolean, Boolean) = { |
| val insertHistOfUpdate = mergeMatches.matchList.exists(p => |
| p.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction]) |
| && p.getActions.exists(_.isInstanceOf[UpdateAction])) |
| val insertHistOfDelete = mergeMatches.matchList.exists(p => |
| p.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction]) |
| && p.getActions.exists(_.isInstanceOf[DeleteAction])) |
| (insertHistOfUpdate, insertHistOfDelete) |
| } |
| |
| private def validateMergeActions(mergeMatches: MergeDataSetMatches, |
| existingDs: Dataset[Row], sparkSession: SparkSession): Unit = { |
| val existAttrs = existingDs.queryExecution.analyzed.output |
| if (mergeMatches.matchList.exists(m => m.getActions.exists(_.isInstanceOf[DeleteAction]) |
| && m.getActions.exists(_.isInstanceOf[UpdateAction]))) { |
| throw new AnalysisException( |
| "Delete and update action should not be under same merge condition") |
| } |
| if (mergeMatches.matchList.count(m => m.getActions.exists(_.isInstanceOf[DeleteAction])) > 1) { |
| throw new AnalysisException("Delete action should not be more than once across merge") |
| } |
| mergeMatches.matchList.foreach { f => |
| if (f.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction])) { |
| if (!(f.getActions.exists(_.isInstanceOf[UpdateAction]) || |
| f.getActions.exists(_.isInstanceOf[DeleteAction]))) { |
| throw new AnalysisException("For inserting to history table, " + |
| "it should be along with either update or delete action") |
| } |
| val value = f.getActions.find(_.isInstanceOf[InsertInHistoryTableAction]).get. |
| asInstanceOf[InsertInHistoryTableAction] |
| if (!existAttrs.forall(f => value.insertMap |
| .exists(_._1.toString().equalsIgnoreCase(f.name)))) { |
| throw new AnalysisException( |
| "Not all source columns are mapped for insert action " + value.insertMap) |
| } |
| value.insertMap.foreach { case (k, v) => |
| selectAttributes(v.expr, existingDs, sparkSession, throwError = true) |
| } |
| } |
| } |
| } |
| |
| override protected def opName: String = "MERGE DATASET" |
| } |