[HOTFIX] Fix InsertFromStage complex data type issue for partition table

Why is this PR needed?

CarbonInsertFromStageCommand with complex data type are not working fine.

What changes were proposed in this PR?

For the partition table, the complex data type of target table should be converted to the binary data type.

Does this PR introduce any user interface change?

No

Is any new testcase added?

Yes

This closes #3556
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index c07f08b..5471420 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -75,7 +75,7 @@
   /**
    * DataType converter for different computing engines
    */
-  private static final ThreadLocal<DataTypeConverter> converter = new ThreadLocal<>();
+  private static DataTypeConverter converter;
 
   /**
    * This method will convert a given value to its specific type
@@ -84,8 +84,8 @@
    * @param dataType
    * @return
    */
-  public static Object getMeasureValueBasedOnDataType(String msrValue, DataType dataType,
-      int scale, int precision) {
+  public static Object getMeasureValueBasedOnDataType(String msrValue, DataType dataType, int scale,
+      int precision) {
     return getMeasureValueBasedOnDataType(msrValue, dataType, scale, precision, false);
   }
 
@@ -105,7 +105,7 @@
           new BigDecimal(msrValue).setScale(scale, RoundingMode.HALF_UP);
       BigDecimal decimal = normalizeDecimalValue(bigDecimal, precision);
       if (useConverter) {
-        return getDataTypeConverter().convertFromBigDecimalToDecimal(decimal);
+        return converter.convertFromBigDecimalToDecimal(decimal);
       } else {
         return decimal;
       }
@@ -140,11 +140,10 @@
     if (dataType == DataTypes.BOOLEAN) {
       return BooleanConvert.parseBoolean(dimValue);
     } else if (DataTypes.isDecimal(dataType)) {
-      BigDecimal bigDecimal =
-          new BigDecimal(dimValue).setScale(scale, RoundingMode.HALF_UP);
+      BigDecimal bigDecimal = new BigDecimal(dimValue).setScale(scale, RoundingMode.HALF_UP);
       BigDecimal decimal = normalizeDecimalValue(bigDecimal, precision);
       if (useConverter) {
-        return getDataTypeConverter().convertFromBigDecimalToDecimal(decimal);
+        return converter.convertFromBigDecimalToDecimal(decimal);
       } else {
         return decimal;
       }
@@ -457,7 +456,7 @@
       }
     } else {
       // Default action for String/Varchar
-      return getDataTypeConverter().convertFromStringToUTF8String(dimensionValue);
+      return converter.convertFromStringToUTF8String(dimensionValue);
     }
   }
 
@@ -518,7 +517,7 @@
     } else if (actualDataType == DataTypes.LONG) {
       return ByteUtil.toXorBytes((Long) dimensionValue);
     } else if (actualDataType == DataTypes.TIMESTAMP) {
-      return ByteUtil.toXorBytes((Long)dimensionValue);
+      return ByteUtil.toXorBytes((Long) dimensionValue);
     } else {
       // Default action for String/Varchar
       return ByteUtil.toBytes(dimensionValue.toString());
@@ -970,11 +969,12 @@
 
   /**
    * set the data type converter as per computing engine
+   *
    * @param converterLocal
    */
   public static void setDataTypeConverter(DataTypeConverter converterLocal) {
     if (converterLocal != null) {
-      converter.set(converterLocal);
+      converter = converterLocal;
       timeStampformatter.remove();
       dateformatter.remove();
     }
@@ -989,17 +989,10 @@
   }
 
   public static DataTypeConverter getDataTypeConverter() {
-    DataTypeConverter dataTypeConverter = converter.get();
-    if (dataTypeConverter == null) {
-      synchronized (converter) {
-        dataTypeConverter = converter.get();
-        if (dataTypeConverter == null) {
-          dataTypeConverter = new DataTypeConverterImpl();
-          converter.set(dataTypeConverter);
-        }
-      }
+    if (converter == null) {
+      converter = new DataTypeConverterImpl();
     }
-    return dataTypeConverter;
+    return converter;
   }
 
   public static DataType valueOf(String name) {
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
index c24c3bf..ac39bd0 100644
--- a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
@@ -113,6 +113,9 @@
     synchronized (this) {
       if (!this.flushed) {
         this.closeWriters();
+        this.commit();
+        this.writerFactory.reset();
+        this.writeCount.set(0);
         this.flushed = true;
       }
     }
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
index 0c8ccbd..1d3ec6b 100644
--- a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
@@ -120,6 +120,9 @@
     synchronized (this) {
       if (!this.flushed) {
         this.closeWriters();
+        this.commit();
+        this.writerFactory.reset();
+        this.writeCount.set(0);
         this.flushed = true;
       }
     }
diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
index cc3c4b4..447e83e 100644
--- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
@@ -71,11 +71,11 @@
       environment.enableCheckpointing(2000L)
       environment.setRestartStrategy(RestartStrategies.noRestart)
 
-      val dataCount = 10000
+      val dataCount = 1000
       val source = new TestSource(dataCount) {
         @throws[InterruptedException]
         override def get(index: Int): Array[AnyRef] = {
-          val data = new Array[AnyRef](5)
+          val data = new Array[AnyRef](7)
           data(0) = "test" + index
           data(1) = index.asInstanceOf[AnyRef]
           data(2) = 12345.asInstanceOf[AnyRef]
@@ -86,7 +86,7 @@
 
         @throws[InterruptedException]
         override def onFinish(): Unit = {
-          Thread.sleep(30000L)
+          Thread.sleep(5000L)
         }
       }
       val stream = environment.addSource(source)
@@ -118,18 +118,99 @@
       assertResult(false)(FileFactory
         .getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
 
-      // ensure the carbon data file count in data directory
-      // is same of the data file count which stage files recorded.
-      assertResult(true)(FileFactory.getCarbonFile(dataLocation).listFiles().length ==
-        collectStageInputs(CarbonTablePath.getStageDir(tablePath)).map(
-          stageInput =>
-            stageInput.getLocations.asScala.map(location => location.getFiles.size()).sum
-        ).sum
+      sql(s"INSERT INTO $tableName STAGE")
+
+      checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
+
+    } finally {
+      sql(s"drop table if exists $tableName").collect()
+      delDir(new File(dataPath))
+    }
+  }
+
+  @Test
+  def testComplexType(): Unit = {
+    sql(s"drop table if exists $tableName").collect()
+    sql(
+      s"""
+         | CREATE TABLE $tableName (stringField string, intField int, shortField short,
+         | structField struct<value1:string,value2:int,value3:int>, binaryField struct<value1:binary>)
+         | STORED AS carbondata
+         | PARTITIONED BY (hour_ string, date_ string)
+         | TBLPROPERTIES ('SORT_COLUMNS'='hour_,date_,stringField', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin
+    ).collect()
+
+    val rootPath = System.getProperty("user.dir") + "/target/test-classes"
+
+    val dataTempPath = rootPath + "/data/temp/"
+    val dataPath = rootPath + "/data/"
+    delDir(new File(dataPath))
+    new File(dataPath).mkdir()
+
+    try {
+      val tablePath = storeLocation + "/" + tableName + "/"
+
+      val writerProperties = newWriterProperties(dataTempPath, dataPath, storeLocation)
+      val carbonProperties = newCarbonProperties(storeLocation)
+
+      val environment = StreamExecutionEnvironment.getExecutionEnvironment
+      environment.setParallelism(6)
+      environment.enableCheckpointing(2000L)
+      environment.setRestartStrategy(RestartStrategies.noRestart)
+
+      val dataCount = 1000
+      val source = new TestSource(dataCount) {
+        @throws[InterruptedException]
+        override def get(index: Int): Array[AnyRef] = {
+          val data = new Array[AnyRef](7)
+          data(0) = "test" + index
+          data(1) = index.asInstanceOf[AnyRef]
+          data(2) = 12345.asInstanceOf[AnyRef]
+          data(3) = "test\0011\0012"
+          data(4) = "test"
+          data(5) = Integer.toString(TestSource.randomCache.get().nextInt(24))
+          data(6) = "20191218"
+          data
+        }
+
+        @throws[InterruptedException]
+        override def onFinish(): Unit = {
+          Thread.sleep(5000L)
+        }
+      }
+      val stream = environment.addSource(source)
+      val factory = CarbonWriterFactory.builder("Local").build(
+        "default",
+        tableName,
+        tablePath,
+        new Properties,
+        writerProperties,
+        carbonProperties
       )
+      val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build
+
+      stream.keyBy(new KeySelector[Array[AnyRef], AnyRef] {
+        override def getKey(value: Array[AnyRef]): AnyRef = value(3) // return hour_
+      }).addSink(streamSink)
+
+      try environment.execute
+      catch {
+        case exception: Exception =>
+          // TODO
+          throw new UnsupportedOperationException(exception)
+      }
+
+      val dataLocation = dataPath + "default" + CarbonCommonConstants.FILE_SEPARATOR +
+                         tableName + CarbonCommonConstants.FILE_SEPARATOR
+
+      assertResult(true)(FileFactory.isFileExist(dataLocation))
+      assertResult(false)(FileFactory
+        .getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
 
       sql(s"INSERT INTO $tableName STAGE")
 
-      checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(10000)))
+      checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
 
     } finally {
       sql(s"drop table if exists $tableName").collect()
diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
index 67c7bab..9195863 100644
--- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
@@ -64,7 +64,7 @@
       environment.enableCheckpointing(2000L)
       environment.setRestartStrategy(RestartStrategies.noRestart)
 
-      val dataCount = 10000
+      val dataCount = 1000
       val source = new TestSource(dataCount) {
         @throws[InterruptedException]
         override def get(index: Int): Array[AnyRef] = {
@@ -103,7 +103,7 @@
 
       sql(s"INSERT INTO $tableName STAGE")
 
-      checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(10000)))
+      checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
 
       // ensure the stage snapshot file and all stage files are deleted
       assertResult(false)(FileFactory.isFileExist(CarbonTablePath.getStageSnapshotFile(tablePath)))
@@ -116,9 +116,9 @@
   }
 
   private def newWriterProperties(
-                                   dataTempPath: String,
-                                   dataPath: String,
-                                   storeLocation: String) = {
+    dataTempPath: String,
+    dataPath: String,
+    storeLocation: String) = {
     val properties = new Properties
     properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
     properties.setProperty(CarbonLocalProperty.DATA_PATH, dataPath)
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index f7b8668..bb5e946 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -26,15 +26,17 @@
 import org.apache.spark.{Accumulator, CarbonInputMetrics, DataSkewRangePartitioner, TaskContext}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.command.ExecutionErrors
 import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, StructField, StructType}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -99,7 +101,7 @@
       ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
       DataLoadProcessorStepOnSpark.convertFunc(rows, index, modelBroadcast, partialSuccessAccum,
         convertStepRowCounter)
-    }.filter(_ != null)// Filter the bad record
+    }.filter(_ != null) // Filter the bad record
 
     // 3. Sort
     val configuration = DataLoadProcessBuilder.createConfiguration(model)
@@ -269,7 +271,7 @@
     val configuration = DataLoadProcessBuilder.createConfiguration(model)
     val header = configuration.getHeader
     val rangeColumn = model.getRangePartitionColumn
-    val rangeColumnIndex = (0 until header.length).find{
+    val rangeColumnIndex = (0 until header.length).find {
       index =>
         header(index).equalsIgnoreCase(rangeColumn.getColName)
     }.get
@@ -427,7 +429,7 @@
       .map(_.getColName)
       .toArray
     val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
-    val rdd: RDD[Row] = new CarbonScanRDD[CarbonRow](
+    val rdd: RDD[InternalRow] = new CarbonScanRDD[CarbonRow](
       sparkSession,
       columnProjection = new CarbonProjection(columns),
       null,
@@ -436,13 +438,13 @@
       carbonTable.getTableInfo,
       new CarbonInputMetrics,
       null,
-      null,
+      classOf[SparkDataTypeConverterImpl],
       classOf[CarbonRowReadSupport],
       splits.asJava)
       .map { row =>
-        new GenericRow(row.getData.asInstanceOf[Array[Any]])
+        new GenericInternalRow(row.getData.asInstanceOf[Array[Any]])
       }
-    sparkSession.createDataFrame(rdd, schema)
+    SparkSQLUtil.execute(rdd, schema, sparkSession)
   }
 }
 
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index 8f39f9b..13e7c45 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -22,13 +22,17 @@
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.SparkContext
 import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.EmptyRule
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSeq, Cast, Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSeq, NamedExpression}
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.{CarbonReflectionUtils, SerializableConfiguration, SparkUtil, Utils}
 
 object SparkSQLUtil {
@@ -38,6 +42,10 @@
     Dataset.ofRows(sparkSession, logicalPlan)
   }
 
+  def execute(rdd: RDD[InternalRow], schema: StructType, sparkSession: SparkSession): DataFrame = {
+    execute(LogicalRDD(schema.toAttributes, rdd)(sparkSession), sparkSession)
+  }
+
   def getSparkSession: SparkSession = {
     SparkSession.getDefaultSession.get
   }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index eb63d03..24e7765 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -32,12 +32,12 @@
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericRow
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -85,7 +85,6 @@
     val tablePath = table.getTablePath
     val stagePath = CarbonTablePath.getStageDir(tablePath)
     val snapshotFilePath = CarbonTablePath.getStageSnapshotFile(tablePath)
-    var loadModel: CarbonLoadModel = null
     val lock = acquireIngestLock(table)
 
     try {
@@ -133,44 +132,21 @@
       val executorService = Executors.newFixedThreadPool(numThreads)
       val stageInputs = collectStageInputs(executorService, stageFiles)
 
-      // 3) add new segment with INSERT_IN_PROGRESS into table status
-      loadModel = DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(spark, table)
-      CarbonLoaderUtil.recordNewLoadMetadata(loadModel)
-
-      // 4) write all existing stage file names and segmentId into a new snapshot file
-      // The content of snapshot file is: first line is segmentId, followed by each line is
-      // one stage file name
-      val content =
-        (Seq(loadModel.getSegmentId) ++ stageFiles.map(_._1.getAbsolutePath)).mkString("\n")
-      FileFactory.writeFile(content, snapshotFilePath)
-
-      // 5) perform data loading
+      // 3) perform data loading
       if (table.isHivePartitionTable) {
-        startLoadingWithPartition(spark, table, loadModel, stageInputs)
+        startLoadingWithPartition(spark, table, stageInputs, stageFiles, snapshotFilePath)
       } else {
-        startLoading(spark, table, loadModel, stageInputs)
+        startLoading(spark, table, stageInputs, stageFiles, snapshotFilePath)
       }
 
-      // 6) write segment file and update the segment entry to SUCCESS
-      val segmentFileName = SegmentFileStore.writeSegmentFile(
-        table, loadModel.getSegmentId, loadModel.getFactTimeStamp.toString)
-      SegmentFileStore.updateTableStatusFile(
-        table, loadModel.getSegmentId, segmentFileName,
-        table.getCarbonTableIdentifier.getTableId,
-        new SegmentFileStore(table.getTablePath, segmentFileName),
-        SegmentStatus.SUCCESS)
-
-      // 7) delete stage files
+      // 4) delete stage files
       deleteStageFiles(executorService, stageFiles)
 
-      // 8) delete the snapshot file
+      // 5) delete the snapshot file
       FileFactory.getCarbonFile(snapshotFilePath).delete()
     } catch {
       case ex: Throwable =>
         LOGGER.error(s"failed to insert ${table.getDatabaseName}.${table.getTableName}", ex)
-        if (loadModel != null) {
-          CarbonLoaderUtil.updateTableStatusForFailure(loadModel)
-        }
         throw ex
     } finally {
       lock.unlock()
@@ -266,24 +242,55 @@
   private def startLoading(
       spark: SparkSession,
       table: CarbonTable,
-      loadModel: CarbonLoadModel,
-      stageInput: Seq[StageInput]
+      stageInput: Seq[StageInput],
+      stageFiles: Array[(CarbonFile, CarbonFile)],
+      snapshotFilePath: String
   ): Unit = {
-    val splits = stageInput.flatMap(_.createSplits().asScala)
-    LOGGER.info(s"start to load ${splits.size} files into " +
-                s"${table.getDatabaseName}.${table.getTableName}")
-    val start = System.currentTimeMillis()
-    val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits)
-    DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
-      spark,
-      Option(dataFrame),
-      loadModel,
-      SparkSQLUtil.sessionState(spark).newHadoopConf()
-    ).map { row =>
-        (row._1, FailureCauses.NONE == row._2._2.failureCauses)
-    }
+    var loadModel: CarbonLoadModel = null
+    try {
+      // 1) add new segment with INSERT_IN_PROGRESS into table status
+      loadModel = DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(spark, table)
+      CarbonLoaderUtil.recordNewLoadMetadata(loadModel)
 
-    LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms")
+      // 2) write all existing stage file names and segmentId into a new snapshot file
+      // The content of snapshot file is: first line is segmentId, followed by each line is
+      // one stage file name
+      val content =
+      (Seq(loadModel.getSegmentId) ++ stageFiles.map(_._1.getAbsolutePath)).mkString("\n")
+      FileFactory.writeFile(content, snapshotFilePath)
+
+      // 3) do loading.
+      val splits = stageInput.flatMap(_.createSplits().asScala)
+      LOGGER.info(s"start to load ${splits.size} files into " +
+                  s"${table.getDatabaseName}.${table.getTableName}")
+      val start = System.currentTimeMillis()
+      val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits)
+      DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
+        spark,
+        Option(dataFrame),
+        loadModel,
+        SparkSQLUtil.sessionState(spark).newHadoopConf()
+      ).map { row =>
+          (row._1, FailureCauses.NONE == row._2._2.failureCauses)
+      }
+      LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms")
+
+      // 4) write segment file and update the segment entry to SUCCESS
+      val segmentFileName = SegmentFileStore.writeSegmentFile(
+        table, loadModel.getSegmentId, loadModel.getFactTimeStamp.toString)
+      SegmentFileStore.updateTableStatusFile(
+        table, loadModel.getSegmentId, segmentFileName,
+        table.getCarbonTableIdentifier.getTableId,
+        new SegmentFileStore(table.getTablePath, segmentFileName),
+        SegmentStatus.SUCCESS)
+    } catch {
+      case ex: Throwable =>
+        LOGGER.error(s"failed to insert ${table.getDatabaseName}.${table.getTableName}", ex)
+        if (loadModel != null) {
+          CarbonLoaderUtil.updateTableStatusForFailure(loadModel)
+        }
+        throw ex
+    }
   }
 
   /**
@@ -292,15 +299,18 @@
   private def startLoadingWithPartition(
       spark: SparkSession,
       table: CarbonTable,
-      loadModel: CarbonLoadModel,
-      stageInput: Seq[StageInput]
+      stageInput: Seq[StageInput],
+      stageFiles: Array[(CarbonFile, CarbonFile)],
+      snapshotFilePath: String
     ): Unit = {
     val partitionDataList = listPartitionFiles(stageInput)
+
+    val content = stageFiles.map(_._1.getAbsolutePath).mkString("\n")
+    FileFactory.writeFile(content, snapshotFilePath)
+
     val start = System.currentTimeMillis()
-    var index = 0
     partitionDataList.map {
       case (partition, splits) =>
-        index = index + 1
         LOGGER.info(s"start to load ${splits.size} files into " +
           s"${table.getDatabaseName}.${table.getTableName}. " +
           s"Partition information: ${partition.mkString(",")}")
@@ -484,22 +494,20 @@
       .map(_.getColName)
       .toArray
     val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
-    val rdd: RDD[Row] = new CarbonScanRDD[InternalRow](
-        sparkSession,
-        columnProjection = new CarbonProjection(columns),
-        null,
-        carbonTable.getAbsoluteTableIdentifier,
-        carbonTable.getTableInfo.serialize,
-        carbonTable.getTableInfo,
-        new CarbonInputMetrics,
-        null,
-        null,
-        classOf[SparkRowReadSupportImpl],
-        splits.asJava
-      ).map { row =>
-        new GenericRow(row.toSeq(schema).toArray)
-      }
-    sparkSession.createDataFrame(rdd, schema)
+    val rdd: RDD[InternalRow] = new CarbonScanRDD[InternalRow](
+      sparkSession,
+      columnProjection = new CarbonProjection(columns),
+      null,
+      carbonTable.getAbsoluteTableIdentifier,
+      carbonTable.getTableInfo.serialize,
+      carbonTable.getTableInfo,
+      new CarbonInputMetrics,
+      null,
+      classOf[SparkDataTypeConverterImpl],
+      classOf[SparkRowReadSupportImpl],
+      splits.asJava
+    )
+    SparkSQLUtil.execute(rdd, schema, sparkSession)
   }
 
   override protected def opName: String = "INSERT STAGE"
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 0309e91..1334178 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -52,7 +52,6 @@
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.encoder.Encoding
@@ -662,34 +661,14 @@
       curAttributes: Seq[AttributeReference],
       sortScope: SortScopeOptions.SortScope,
       isDataFrame: Boolean): (LogicalPlan, Int, Option[RDD[InternalRow]]) = {
+    val catalogAttributes = catalogTable.schema.toAttributes
     // Converts the data as per the loading steps before give it to writer or sorter
-    val convertedRdd = convertData(
+    val updatedRdd = convertData(
       rdd,
       sparkSession,
       loadModel,
       isDataFrame,
       partitionValues)
-    val updatedRdd = if (isDataFrame) {
-      val columnCount = loadModel.getCsvHeaderColumns.length
-      convertedRdd.map { row =>
-        val array = new Array[AnyRef](columnCount)
-        val data = row.getData
-        var i = 0
-        while (i < columnCount) {
-          data(i) match {
-            case string: String =>
-              array(i) = UTF8String.fromString(string)
-            case _ =>
-              array(i) = data(i)
-          }
-          i = i + 1
-        }
-        array
-      }.map(row => InternalRow.fromSeq(row))
-    } else {
-      convertedRdd.map(row => InternalRow.fromSeq(row.getData))
-    }
-    val catalogAttributes = catalogTable.schema.toAttributes
     var attributes = curAttributes.map(a => {
       catalogAttributes.find(_.name.equalsIgnoreCase(a.name)).get
     })
@@ -783,7 +762,7 @@
       sparkSession: SparkSession,
       model: CarbonLoadModel,
       isDataFrame: Boolean,
-      partitionValues: Array[String]): RDD[CarbonRow] = {
+      partitionValues: Array[String]): RDD[InternalRow] = {
     val sc = sparkSession.sparkContext
     val info =
       model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getFactTable.getPartitionInfo
@@ -827,7 +806,7 @@
           partialSuccessAccum,
           inputStepRowCounter,
           keepActualData = true)
-      }.filter(_ != null)
+      }.filter(_ != null).map(row => InternalRow.fromSeq(row.getData))
 
     finalRDD
   }