[CARBONDATA-3929] Improve CDC performance

Why is this PR needed?
This PR is to improve the CDC merge performance. CDC is currently very slow in the case of full outer joins and slow in normal cases. Identified pain points are as below:
1. currently we are writing the intermediate delete data to carbon format, which is the columnar format, and we do a full scan which is slow. Here since intermediate, we do full scan, compression, columnar format, its all-time taking.
2. Full outer join case is very slow.
3. when we insert new data into new segments, we follow the old insert flow with the converter step.
4. since we write the intermediate data carbon format, we use coalesce to limit the partition to number of active executors.

What changes were proposed in this PR?
Some improvements points are identified as below
1. Write the intermediate data to a faster row format like Avro.
2. use bucketing on join column and do the repartition of the Dataframe before performing the join operation, which avoids the shuffle on one side as shuffle is major time consuming part in join.
3. make the insert flow to the new flow without the converter step.
4. remove coalesce and can use resource to write the intermediate Avro data in a faster way.

Performance results
DataSize -> 2GB target table data
230MB source table data
InnerJoin case - around 17000+ deleted rows, 70400 odd updated rows
Full outer Join case - 2million target data, 0.2million src data, 70400 odd rows updated and some deleted
                Old Time(sec)		        New Time(sec)
Join Type	1st time query	2nd time query	1st time query	2nd time query
Inner Join	20      	9.6     	14      	4.6
Full Outer Join	43      	17.8    	26      	7.7

Does this PR introduce any user interface change?
No

Is any new testcase added?
Yes

This closes #3856
diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml
index b5f2b9d..d5f5772 100644
--- a/integration/spark/pom.xml
+++ b/integration/spark/pom.xml
@@ -154,6 +154,28 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>com.databricks</groupId>
+      <artifactId>spark-avro_${scala.binary.version}</artifactId>
+      <version>4.0.0</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-avro_${scala.binary.version}</artifactId>
+      <version>2.4.5</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
       <exclusions>
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 6a655e3..e622671 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -28,7 +28,7 @@
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal, NamedExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.execution.command.AtomicRunnableCommand
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, UpdateTableModel}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.util.CausedBy
@@ -64,7 +64,8 @@
     var tableInfo: TableInfo,
     var internalOptions: Map[String, String] = Map.empty,
     var partition: Map[String, Option[String]] = Map.empty,
-    var operationContext: OperationContext = new OperationContext)
+    var operationContext: OperationContext = new OperationContext,
+    var updateModel: Option[UpdateTableModel] = None)
   extends AtomicRunnableCommand {
 
   var table: CarbonTable = _
@@ -200,11 +201,20 @@
           options = options.asJava,
           isOverwriteTable = isOverwriteTable,
           isDataFrame = true,
-          updateModel = None,
+          updateModel = updateModel,
           operationContext = operationContext)
 
       // add the start entry for the new load in the table status file
-      if (!table.isHivePartitionTable) {
+      if ((updateModel.isEmpty || updateModel.isDefined && updateModel.get.loadAsNewSegment)
+          && !table.isHivePartitionTable) {
+        if (updateModel.isDefined ) {
+          carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)
+        }
+        CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
+          carbonLoadModel,
+          isOverwriteTable)
+        isUpdateTableStatusRequired = true
+      } else if (!table.isHivePartitionTable) {
         CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
           carbonLoadModel,
           isOverwriteTable)
@@ -238,7 +248,7 @@
         partitionStatus,
         None,
         Some(scanResultRdd),
-        None,
+        updateModel,
         operationContext)
       LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
       val (rows, loadResult) = insertData(loadParams)
@@ -439,6 +449,13 @@
 
   def insertData(loadParams: CarbonLoadParams): (Seq[Row], LoadMetadataDetails) = {
     var rows = Seq.empty[Row]
+    val loadDataFrame = if (updateModel.isDefined && !updateModel.get.loadAsNewSegment) {
+      // TODO: handle the update flow for new insert into flow without converter step
+      throw new UnsupportedOperationException(
+        "Update flow is not supported without no converter step yet.")
+    } else {
+      Some(dataFrame)
+    }
     val table = loadParams.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     var loadResult : LoadMetadataDetails = null
     if (table.isHivePartitionTable) {
@@ -449,9 +466,9 @@
         loadParams.partitionStatus,
         isOverwriteTable,
         loadParams.hadoopConf,
-        None,
+        loadDataFrame,
         loadParams.scanResultRDD,
-        None,
+        updateModel,
         operationContext)
     }
     (rows, loadResult)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
index bdc3043..5e75c56 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
@@ -26,18 +26,17 @@
 import org.apache.hadoop.mapreduce.{Job, JobID, TaskAttemptID, TaskID, TaskType}
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, CarbonUtils, Column, DataFrame, Dataset, Row, SparkSession}
-import org.apache.spark.sql.carbondata.execution.datasources.SparkCarbonFileFormat
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonUtils, Column, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.avro.AvroFileFormatFactory
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, GenericInternalRow, GenericRowWithSchema}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, EqualTo, Expression, GenericInternalRow, GenericRowWithSchema}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.command.{DataCommand, ExecutionErrors, UpdateTableModel}
-import org.apache.spark.sql.execution.command.management.CarbonInsertIntoWithDf
+import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
 import org.apache.spark.sql.execution.command.mutation.HorizontalCompaction
-import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
 import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.unsafe.types.UTF8String
@@ -52,6 +51,7 @@
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.OperationContext
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.spark.util.CarbonSparkUtil
 
@@ -79,15 +79,15 @@
    *
    */
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val rltn = CarbonUtils.collectCarbonRelation(targetDsOri.logicalPlan)
+    val relations = CarbonUtils.collectCarbonRelation(targetDsOri.logicalPlan)
     // Target dataset must be backed by carbondata table.
-    if (rltn.length != 1) {
+    if (relations.length != 1) {
       throw new UnsupportedOperationException(
         "Carbon table supposed to be present in merge dataset")
     }
     // validate the merge matches and actions.
     validateMergeActions(mergeMatches, targetDsOri, sparkSession)
-    val carbonTable = rltn.head.carbonRelation.carbonTable
+    val carbonTable = relations.head.carbonRelation.carbonTable
     val hasDelAction = mergeMatches.matchList
       .exists(_.getActions.exists(_.isInstanceOf[DeleteAction]))
     val hasUpdateAction = mergeMatches.matchList
@@ -106,18 +106,34 @@
     // decide join type based on match conditions
     val joinType = decideJoinType
 
+    val joinColumn = mergeMatches.joinExpr.expr.asInstanceOf[EqualTo].left
+      .asInstanceOf[UnresolvedAttribute].nameParts.tail.head
+    // repartition the srsDs, if the target has bucketing and the bucketing column and join column
+    // are same
+    val repartitionedSrcDs =
+      if (carbonTable.getBucketingInfo != null &&
+          carbonTable.getBucketingInfo
+            .getListOfColumns
+            .asScala
+            .exists(_.getColumnName.equalsIgnoreCase(joinColumn))) {
+      srcDS.repartition(carbonTable.getBucketingInfo.getNumOfRanges, srcDS.col(joinColumn))
+    } else {
+      srcDS
+    }
     // Add the getTupleId() udf to get the tuple id to generate delete delta.
     val frame =
       targetDs
         .withColumn(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, expr("getTupleId()"))
         .withColumn("exist_on_target", lit(1))
-        .join(srcDS.withColumn("exist_on_src", lit(1)), mergeMatches.joinExpr, joinType)
+        .join(repartitionedSrcDs.withColumn("exist_on_src", lit(1)),
+          mergeMatches.joinExpr,
+          joinType)
         .withColumn(status_on_mergeds, condition)
     if (LOGGER.isDebugEnabled) {
       frame.explain()
     }
     val tableCols =
-      carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala.map(_.getColumnName).
+      carbonTable.getCreateOrderColumn.asScala.map(_.getColName).
         filterNot(_.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE))
     val header = tableCols.mkString(",")
 
@@ -126,19 +142,19 @@
         case u: UpdateAction => MergeProjection(tableCols,
           status_on_mergeds,
           frame,
-          rltn.head,
+          relations.head,
           sparkSession,
           u)
         case i: InsertAction => MergeProjection(tableCols,
           status_on_mergeds,
           frame,
-          rltn.head,
+          relations.head,
           sparkSession,
           i)
         case d: DeleteAction => MergeProjection(tableCols,
           status_on_mergeds,
           frame,
-          rltn.head,
+          relations.head,
           sparkSession,
           d)
         case _ => null
@@ -151,7 +167,7 @@
       createLongAccumulator("updatedRows"),
       createLongAccumulator("deletedRows"))
     val targetSchema = StructType(tableCols.map { f =>
-      rltn.head.carbonRelation.schema.find(_.name.equalsIgnoreCase(f)).get
+      relations.head.carbonRelation.schema.find(_.name.equalsIgnoreCase(f)).get
     } ++ Seq(StructField(status_on_mergeds, IntegerType)))
     val (processedRDD, deltaPath) = processIUD(sparkSession, frame, carbonTable, projections,
       targetSchema, stats)
@@ -170,7 +186,7 @@
     loadDF.cache()
     val count = loadDF.count()
     val updateTableModel = if (FileFactory.isFileExist(deltaPath)) {
-      val deltaRdd = sparkSession.read.format("carbon").load(deltaPath).rdd
+      val deltaRdd = AvroFileFormatFactory.readAvro(sparkSession, deltaPath)
       val tuple = mutationAction.handleAction(deltaRdd, executorErrors, trxMgr)
       FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(deltaPath))
       if (!CarbonUpdateUtil.updateSegmentStatus(tuple._1.asScala.asJava,
@@ -194,29 +210,32 @@
             tuple._2.asJava)
         }
       }
-      Some(UpdateTableModel(true, trxMgr.getLatestTrx,
-        executorErrors, tuple._2, true))
+      Some(UpdateTableModel(isUpdate = true, trxMgr.getLatestTrx,
+        executorErrors, tuple._2, loadAsNewSegment = true))
     } else {
       None
     }
 
-    CarbonInsertIntoWithDf(
-      databaseNameOp = Some(carbonTable.getDatabaseName),
+    val dataFrame = loadDF.select(tableCols.map(col): _*)
+    CarbonInsertIntoCommand(databaseNameOp = Some(carbonTable.getDatabaseName),
       tableName = carbonTable.getTableName,
-      options = Map("fileheader" -> header, "sort_scope" -> "nosort"),
+      options = Map("fileheader" -> header, "sort_scope" -> "no_sort"),
       isOverwriteTable = false,
-      dataFrame = loadDF.select(tableCols.map(col): _*),
-      updateModel = updateTableModel,
-      tableInfoOp = Some(carbonTable.getTableInfo)).process(sparkSession)
-
+      dataFrame.queryExecution.logical,
+      carbonTable.getTableInfo,
+      Map.empty,
+      Map.empty,
+      new OperationContext,
+      updateTableModel
+    ).run(sparkSession)
     LOGGER.info(s"Total inserted rows: ${stats.insertedRows.sum}")
     LOGGER.info(s"Total updated rows: ${stats.updatedRows.sum}")
     LOGGER.info(s"Total deleted rows: ${stats.deletedRows.sum}")
     LOGGER.info(
       " Time taken to merge data  :: " + (System.currentTimeMillis() - st))
 
-    // Load the history table if the insert history table action is added by user.
-    HistoryTableLoadHelper.loadHistoryTable(sparkSession, rltn.head, carbonTable,
+  // Load the history table if the insert history table action is added by user.
+    HistoryTableLoadHelper.loadHistoryTable(sparkSession, relations.head, carbonTable,
       trxMgr, mutationAction, mergeMatches)
     // Do IUD Compaction.
     HorizontalCompaction.tryHorizontalCompaction(
@@ -262,7 +281,7 @@
       carbonTable: CarbonTable,
       projections: Seq[Seq[MergeProjection]],
       targetSchema: StructType,
-      stats: Stats) = {
+      stats: Stats): (RDD[InternalRow], String) = {
     val frameCols = frame.queryExecution.analyzed.output
     val status = frameCols.length - 1
     val tupleId = frameCols.zipWithIndex
@@ -275,37 +294,26 @@
     job.setOutputValueClass(classOf[InternalRow])
     val uuid = UUID.randomUUID.toString
     job.setJobID(new JobID(uuid, 0))
-    val path = carbonTable.getTablePath + "/" + job.getJobID
+    val path = carbonTable.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + "avro"
     FileOutputFormat.setOutputPath(job, new Path(path))
     val schema =
       org.apache.spark.sql.types.StructType(Seq(
         StructField(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, StringType),
         StructField(status_on_mergeds, IntegerType)))
-    val factory =
-      new SparkCarbonFileFormat().prepareWrite(sparkSession, job,
-        Map(), schema)
+    val factory = AvroFileFormatFactory.getAvroWriter(sparkSession, job, schema)
     val config = SparkSQLUtil.broadCastHadoopConf(sparkSession.sparkContext, job.getConfiguration)
-    (frame.rdd.coalesce(DistributionUtil.getConfiguredExecutors(sparkSession.sparkContext)).
-      mapPartitionsWithIndex { case (index, iter) =>
-        var directlyWriteDataToHdfs = CarbonProperties.getInstance()
-          .getProperty(CarbonLoadOptionConstants
-            .ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, CarbonLoadOptionConstants
-            .ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT)
-        CarbonProperties.getInstance().addProperty(CarbonLoadOptionConstants
-          .ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, "true")
-        val confB = config.value.value
-        val task = new TaskID(new JobID(uuid, 0), TaskType.MAP, index)
-        val attemptID = new TaskAttemptID(task, index)
-        val context = new TaskAttemptContextImpl(confB, attemptID)
-        val writer = factory.newInstance(path, schema, context)
-        val projLen = projections.length
+    (frame.rdd.mapPartitionsWithIndex { case (index, iter) =>
+      val confB = config.value.value
+      val task = new TaskID(new JobID(uuid, 0), TaskType.MAP, index)
+      val attemptID = new TaskAttemptID(task, index)
+      val context = new TaskAttemptContextImpl(confB, attemptID)
+      val writer = factory.newInstance(path + CarbonCommonConstants.FILE_SEPARATOR + task.toString,
+        schema, context)
+      val projLen = projections.length
         new Iterator[InternalRow] {
           val queue = new util.LinkedList[InternalRow]()
           override def hasNext: Boolean = if (!queue.isEmpty || iter.hasNext) true else {
             writer.close()
-            // revert load direct write to store path after insert
-            CarbonProperties.getInstance().addProperty(CarbonLoadOptionConstants
-              .ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, directlyWriteDataToHdfs)
             false
           }
 
@@ -365,7 +373,8 @@
   private def createLongAccumulator(name: String) = {
     val acc = new LongAccumulator
     acc.setValue(0)
-    acc.metadata = AccumulatorMetadata(AccumulatorContext.newId(), Some(name), false)
+    acc.metadata = AccumulatorMetadata(AccumulatorContext.newId(), Some(name), countFailedValues
+      = false)
     AccumulatorContext.register(acc)
     acc
   }
@@ -541,7 +550,7 @@
     }.filter(_ != null)
   }
 
-  private def getInsertHistoryStatus(mergeMatches: MergeDataSetMatches) = {
+  private def getInsertHistoryStatus(mergeMatches: MergeDataSetMatches): (Boolean, Boolean) = {
     val insertHistOfUpdate = mergeMatches.matchList.exists(p =>
       p.getActions.exists(_.isInstanceOf[InsertInHistoryTableAction])
       && p.getActions.exists(_.isInstanceOf[UpdateAction]))
@@ -577,7 +586,7 @@
             "Not all source columns are mapped for insert action " + value.insertMap)
         }
         value.insertMap.foreach { case (k, v) =>
-          selectAttributes(v.expr, existingDs, sparkSession, true)
+          selectAttributes(v.expr, existingDs, sparkSession, throwError = true)
         }
       }
     }
diff --git a/integration/spark/src/main/spark2.3/com/databricks/spark/avro/AvroWriter.scala b/integration/spark/src/main/spark2.3/com/databricks/spark/avro/AvroWriter.scala
new file mode 100644
index 0000000..8de31be
--- /dev/null
+++ b/integration/spark/src/main/spark2.3/com/databricks/spark/avro/AvroWriter.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.databricks.spark.avro
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.datasources.OutputWriterFactory
+
+/**
+ * This class is to get the avro writer from databricks avro module, as its not present in spark2.3
+ * and spark-avro module is included in spark project from spark-2.4. So for spark-2.4, we use Avro
+ * writer from spark project.
+ */
+object AvroWriter {
+
+  def getWriter(spark: org.apache.spark.sql.SparkSession,
+      job: org.apache.hadoop.mapreduce.Job,
+      dataSchema: org.apache.spark.sql.types.StructType,
+      options: scala.Predef.Map[scala.Predef.String, scala.Predef.String] = Map.empty)
+  : OutputWriterFactory = {
+    new DefaultSource().prepareWrite(spark, job,
+      options, dataSchema)
+  }
+}
+
+/**
+ * This reads the avro files from the given path and return the RDD[Row]
+ */
+object AvroReader {
+
+  def readAvro(spark: org.apache.spark.sql.SparkSession, deltaPath: String): RDD[Row] = {
+    spark.sparkContext
+      .hadoopConfiguration
+      .set("avro.mapred.ignore.inputs.without.extension", "false")
+    spark.read.avro(deltaPath).rdd
+  }
+}
diff --git a/integration/spark/src/main/spark2.3/org/apache/spark/sql/avro/AvroFileFormatFactory.scala b/integration/spark/src/main/spark2.3/org/apache/spark/sql/avro/AvroFileFormatFactory.scala
new file mode 100644
index 0000000..616f052
--- /dev/null
+++ b/integration/spark/src/main/spark2.3/org/apache/spark/sql/avro/AvroFileFormatFactory.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.avro
+
+import com.databricks.spark.avro.{AvroReader, AvroWriter}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.datasources.OutputWriterFactory
+
+object AvroFileFormatFactory {
+
+  /**
+   * return the avro writer to write the avro files
+   * @return avro writer
+   */
+  def getAvroWriter(spark: org.apache.spark.sql.SparkSession,
+      job: org.apache.hadoop.mapreduce.Job,
+      dataSchema: org.apache.spark.sql.types.StructType,
+      options: scala.Predef.Map[scala.Predef.String, scala.Predef.String] = Map.empty)
+  : OutputWriterFactory = {
+    AvroWriter.getWriter(spark, job, dataSchema, options)
+  }
+
+  /**
+   * Reads the avro files present at the given path
+   * @param deltaPath path to read the avro files from.
+   * @return RDD[Row]
+   */
+  def readAvro(spark: org.apache.spark.sql.SparkSession, deltaPath: String): RDD[Row] = {
+    spark.sparkContext
+      .hadoopConfiguration
+      .set("avro.mapred.ignore.inputs.without.extension", "false")
+    AvroReader.readAvro(spark, deltaPath)
+  }
+}
diff --git a/integration/spark/src/main/spark2.4/org/apache/spark/sql/avro/AvroFileFormatFactory.scala b/integration/spark/src/main/spark2.4/org/apache/spark/sql/avro/AvroFileFormatFactory.scala
new file mode 100644
index 0000000..7fcd5dc
--- /dev/null
+++ b/integration/spark/src/main/spark2.4/org/apache/spark/sql/avro/AvroFileFormatFactory.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.avro
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.datasources.OutputWriterFactory
+
+object AvroFileFormatFactory {
+
+  /**
+   * return the avro writer to write the avro files
+   * @return avro writer
+   */
+  def getAvroWriter(spark: org.apache.spark.sql.SparkSession,
+      job: org.apache.hadoop.mapreduce.Job,
+      dataSchema: org.apache.spark.sql.types.StructType,
+      options: scala.Predef.Map[scala.Predef.String, scala.Predef.String] = Map.empty)
+  : OutputWriterFactory = {
+    new AvroFileFormat().prepareWrite(spark, job, options, dataSchema)
+  }
+
+  /**
+   * Reads the avro files present at the given path
+   * @param deltaPath path to read the avro files from.
+   * @return RDD[Row]
+   */
+  def readAvro(spark: org.apache.spark.sql.SparkSession, deltaPath: String): RDD[Row] = {
+    spark.sparkContext
+      .hadoopConfiguration
+      .set("avro.mapred.ignore.inputs.without.extension", "false")
+    spark.read.format("avro").load(s"$deltaPath").rdd
+  }
+}
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
index 916846a..56b98a3 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -91,6 +91,11 @@
     (dwSelframe, odsframe)
   }
 
+  private def initializeWithBucketing = {
+    sql("create table order(id string, name string, c_name string, quantity int, price int, state int) stored as carbondata tblproperties('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='id')")
+    initialize
+  }
+
   private def initializeGloabalSort = {
     val initframe = generateData(10)
     initframe.write
@@ -821,11 +826,14 @@
     dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched().
       insertExpr(insertMap).execute()
     val sdf = new SimpleDateFormat("yyyy-MM-dd")
+    // in case of cdc, the insert into flow goes to no converter step to save time as incoming data
+    // from source will be correct, we wont use the table level timestamp format or load level for
+    // the insert into of cdc data.
     checkAnswer(
       sql("select date,time from order where id = 'id1'"),
       Seq(
         Row(new Date(sdf.parse("2015-07-23").getTime), Timestamp.valueOf("2015-03-03 12:25:00")),
-        Row(new Date(sdf.parse("2015-07-23").getTime), Timestamp.valueOf("2015-05-23 10:30:00"))
+        Row(new Date(sdf.parse("2015-07-23").getTime), Timestamp.valueOf("2015-05-23 10:30:30"))
       ))
     checkAnswer(
       sql("select date,time from order where id = 'id11'"),
@@ -834,6 +842,36 @@
       ))
   }
 
+  test("test merge update and insert with condition and expression and delete action with target table as bucketing") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initializeWithBucketing
+
+    var matches = Seq.empty[MergeMatch]
+    val updateMap = Map(col("id") -> col("A.id"),
+      col("price") -> expr("B.price + 1"),
+      col("state") -> col("B.state"))
+
+    val insertMap = Map(col("id") -> col("B.id"),
+      col("name") -> col("B.name"),
+      col("c_name") -> col("B.c_name"),
+      col("quantity") -> col("B.quantity"),
+      col("price") -> expr("B.price * 100"),
+      col("state") -> col("B.state"))
+
+    matches ++= Seq(WhenMatched(Some(col("A.state") =!= col("B.state"))).addAction(UpdateAction(updateMap)))
+    matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
+    matches ++= Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()))
+
+    CarbonMergeDataSetCommand(dwSelframe,
+      odsframe,
+      MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession)
+    assert(getDeleteDeltaFileCount("order", "0") == 2)
+    checkAnswer(sql("select count(*) from order where id like 'newid%'"), Seq(Row(2)))
+    checkAnswer(sql("select count(*) from order"), Seq(Row(10)))
+    checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
+    checkAnswer(sql("select price from order where id = 'newid1'"), Seq(Row(7500)))
+  }
+
   case class Target (id: Int, value: String, remark: String, mdt: String)
   case class Change (id: Int, value: String, change_type: String, mdt: String)
   private val numInitialRows = 10