[CARBONDATA-3728] Fix insert failure on partition table with local sort

Why is this PR needed?

In the new Insert flow, partition column data is maintained at the end till convert to 3 steps of the write step.

But when local sort happens before the write step, The mapping is derived based on original internal order instead of partition internal order. Hence insert fails during sorting.

What changes were proposed in this PR?

Use internal partition order instead of internal order.

Support 1.1 compatibility

avoid impact for sort step of load flow partition.

Does this PR introduce any user interface change?
No

Is any new testcase added?
Yes

This closes #3645
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
index 631429e..ae6507c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
@@ -30,6 +30,7 @@
 import org.apache.carbondata.core.metadata.schema.table.Writable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 
 public class TableSpec {
 
@@ -56,10 +57,40 @@
   private int[] dictDimActualPosition;
   private int[] noDictDimActualPosition;
 
-  public TableSpec(CarbonTable carbonTable) {
+  public TableSpec(CarbonTable carbonTable, boolean keepPartitionColumnsToEnd) {
     this.carbonTable = carbonTable;
     List<CarbonDimension> dimensions = carbonTable.getVisibleDimensions();
     List<CarbonMeasure> measures = carbonTable.getVisibleMeasures();
+    if (keepPartitionColumnsToEnd && carbonTable.getPartitionInfo() != null) {
+      // keep the partition columns in the end
+      List<CarbonDimension> reArrangedDimensions = new ArrayList<>();
+      List<CarbonMeasure> reArrangedMeasures = new ArrayList<>();
+      List<CarbonDimension> partitionDimensions = new ArrayList<>();
+      List<CarbonMeasure> partitionMeasures = new ArrayList<>();
+      List<ColumnSchema> columnSchemaList = carbonTable.getPartitionInfo().getColumnSchemaList();
+      for (CarbonDimension dim : dimensions) {
+        if (columnSchemaList.contains(dim.getColumnSchema())) {
+          partitionDimensions.add(dim);
+        } else {
+          reArrangedDimensions.add(dim);
+        }
+      }
+      if (partitionDimensions.size() != 0) {
+        reArrangedDimensions.addAll(partitionDimensions);
+      }
+      for (CarbonMeasure measure : measures) {
+        if (columnSchemaList.contains(measure.getColumnSchema())) {
+          partitionMeasures.add(measure);
+        } else {
+          reArrangedMeasures.add(measure);
+        }
+      }
+      if (partitionMeasures.size() != 0) {
+        reArrangedMeasures.addAll(partitionMeasures);
+      }
+      dimensions = reArrangedDimensions;
+      measures = reArrangedMeasures;
+    }
     // first calculate total number of columnar field considering column group and complex column
     numSimpleDimensions = 0;
     for (CarbonDimension dimension : dimensions) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index f76a3db..a0b2676 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -61,6 +61,7 @@
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.indexserver.DistributedRDDUtils
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -609,7 +610,8 @@
       optionsOriginal: mutable.Map[String, String],
       currPartitions: util.List[PartitionSpec]): LogicalRelation = {
     val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
-    val metastoreSchema = if (optionsOriginal.contains("no_rearrange_of_rows")) {
+    val metastoreSchema =
+      if (optionsOriginal.contains(DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS)) {
       StructType(catalogTable.schema.fields.map{f =>
         val column = table.getColumnByName(f.name)
         val updatedDataType = if (column.getDataType ==
@@ -694,7 +696,7 @@
       fileFormat = new SparkCarbonTableFormat,
       options = options.toMap)(sparkSession = sparkSession)
 
-    if (options.contains("no_rearrange_of_rows")) {
+    if (options.contains(DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS)) {
       CarbonReflectionUtils.getLogicalRelation(hdfsRelation,
         metastoreSchema.toAttributes,
         Some(catalogTable),
@@ -980,7 +982,7 @@
       }
       val opt = collection.mutable.Map() ++ loadParams.optionsOriginal
       if (loadParams.scanResultRDD.isDefined) {
-        opt += (("no_rearrange_of_rows", "true"))
+        opt += ((DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS, "true"))
       }
       // Create and ddd the segment to the tablestatus.
       CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadParams.carbonLoadModel,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index d93b079..da46177 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -51,6 +51,7 @@
 import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat}
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.util.CarbonBadRecordUtil
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
@@ -134,7 +135,7 @@
       model,
       conf)
     CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
-    if (options.contains("no_rearrange_of_rows")) {
+    if (options.contains(DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS)) {
       model.setLoadWithoutConverterWithoutReArrangeStep(true)
     } else {
       model.setLoadWithoutConverterStep(true)
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index d54b07b..4df678b 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -502,6 +502,51 @@
     assert(result.get(0).get(7).equals(dataAndIndexSize._2))
   }
 
+  test("test partition with all sort scope") {
+    sql("drop table if exists origin_csv")
+    sql(
+      s"""
+         | create table origin_csv(col1 int, col2 string, col3 date)
+         | using csv
+         | options('dateFormat'='yyyy-MM-dd', 'timestampFormat'='yyyy-MM-dd HH:mm:ss')
+         | """.stripMargin)
+    sql("insert into origin_csv select 1, '3aa', to_date('2019-11-11')")
+    sql("insert into origin_csv select 2, '2bb', to_date('2019-11-12')")
+    sql("insert into origin_csv select 3, '1cc', to_date('2019-11-13')")
+    verifyInsertForPartitionTable("tbl_p_ns", "no_sort")
+    verifyInsertForPartitionTable("tbl_p_ls", "local_sort")
+    verifyInsertForPartitionTable("tbl_p_gs", "global_sort")
+    sql("drop table origin_csv")
+  }
+
+  def verifyInsertForPartitionTable(tableName: String, sort_scope: String): Unit = {
+    sql(s"drop table if exists $tableName")
+    sql(
+      s"""
+         | create table $tableName (
+         | col1 int,
+         | col2 string,
+         | col3 date,
+         | col4 timestamp,
+         | col5 float
+         | )
+         | using carbondata
+         | options('dateFormat'='yyyy-MM-dd', 'timestampFormat'='yyyy-MM-dd HH:mm:ss',
+         | 'sort_scope'='${ sort_scope }', 'sort_columns'='col2')
+         | partitioned by(col3, col4)
+     """.stripMargin)
+    sql(
+      s"""
+         | insert into $tableName (
+         |  select col1, col2, 1.2, col3, to_timestamp('2019-02-02 13:01:01') from origin_csv
+         |  union all
+         |  select 123,'abc', 1.2, to_date('2019-01-01'), to_timestamp('2019-02-02 13:01:01'))
+         |  """.stripMargin
+    )
+    checkAnswer(sql(s"select count(*) from $tableName"), Seq(Row(4)))
+    sql(s"drop table $tableName")
+  }
+
   def getDataAndIndexSize(path: String): (String, String) = {
     val mergeIndexFiles = FileFactory.getCarbonFile(path).listFiles(new CarbonFileFilter {
       override def accept(file: CarbonFile): Boolean = {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 1af4fe3..7d0d87a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -250,6 +250,23 @@
     return type;
   }
 
+  public DataType[] getMeasureDataTypeAsDataFieldOrder() {
+    // same as data fields order
+    List<Integer> measureIndexes = new ArrayList<>(dataFields.length);
+    int measureCount = 0;
+    for (int i = 0; i < dataFields.length; i++) {
+      if (!dataFields[i].getColumn().isDimension()) {
+        measureIndexes.add(i);
+        measureCount++;
+      }
+    }
+    DataType[] type = new DataType[measureCount];
+    for (int i = 0; i < type.length; i++) {
+      type[i] = dataFields[measureIndexes.get(i)].getColumn().getDataType();
+    }
+    return type;
+  }
+
   /**
    * Get the data types of the no dictionary and the complex dimensions of the table
    *
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 8586a61..75af485 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -238,7 +238,7 @@
     configuration.setDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER,
         loadModel.getBinaryDecoder());
     if (loadModel.isLoadWithoutConverterWithoutReArrangeStep()) {
-      configuration.setDataLoadProperty("no_rearrange_of_rows",
+      configuration.setDataLoadProperty(DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS,
           loadModel.isLoadWithoutConverterWithoutReArrangeStep());
     }
     List<CarbonDimension> dimensions = carbonTable.getVisibleDimensions();
@@ -267,7 +267,7 @@
     if (carbonTable.isHivePartitionTable()) {
       configuration.setWritingCoresCount((short) 1);
     }
-    TableSpec tableSpec = new TableSpec(carbonTable);
+    TableSpec tableSpec = new TableSpec(carbonTable, false);
     configuration.setTableSpec(tableSpec);
     if (loadModel.getSdkWriterCores() > 0) {
       configuration.setWritingCoresCount(loadModel.getSdkWriterCores());
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java b/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
index dbd9048..c7ef81b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
@@ -38,4 +38,6 @@
 
   public static final String FACT_FILE_PATH = "FACT_FILE_PATH";
 
+  // to indicate that it is optimized insert flow without rearrange of each data rows
+  public static final String NO_REARRANGE_OF_ROWS = "NO_REARRANGE_OF_ROWS";
 }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index 0f5b203..635b3b4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -44,6 +44,7 @@
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.loading.exception.BadRecordFoundException;
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
@@ -137,7 +138,8 @@
       CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
           .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PARTITION_ID,
               System.currentTimeMillis());
-      if (configuration.getDataLoadProperty("no_rearrange_of_rows") != null) {
+      if (configuration.getDataLoadProperty(
+          DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS) != null) {
         initializeNoReArrangeIndexes();
       }
       if (iterators.length == 1) {
@@ -363,7 +365,8 @@
   private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler, int iteratorIndex)
       throws CarbonDataLoadingException {
     try {
-      if (configuration.getDataLoadProperty("no_rearrange_of_rows") != null) {
+      if (configuration.getDataLoadProperty(
+          DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS) != null) {
         // convert without re-arrange
         while (batch.hasNext()) {
           CarbonRow row = batch.next();
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index cb95226..9d41854 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -30,6 +30,7 @@
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 import org.apache.commons.lang3.StringUtils;
@@ -158,6 +159,8 @@
    */
   private int[] noDictSortColumnSchemaOrderMapping;
 
+  private boolean isInsertWithoutReArrangeFlow;
+
   public SortParameters getCopy() {
     SortParameters parameters = new SortParameters();
     parameters.tempFileLocation = tempFileLocation;
@@ -196,6 +199,7 @@
     parameters.dictDimActualPosition = dictDimActualPosition;
     parameters.noDictActualPosition = noDictActualPosition;
     parameters.noDictSortColumnSchemaOrderMapping = noDictSortColumnSchemaOrderMapping;
+    parameters.isInsertWithoutReArrangeFlow = isInsertWithoutReArrangeFlow;
     return parameters;
   }
 
@@ -407,6 +411,14 @@
     this.noDictSortColumnSchemaOrderMapping = noDictSortColumnSchemaOrderMapping;
   }
 
+  public boolean isInsertWithoutReArrangeFlow() {
+    return isInsertWithoutReArrangeFlow;
+  }
+
+  public void setInsertWithoutReArrangeFlow(boolean insertWithoutReArrangeFlow) {
+    isInsertWithoutReArrangeFlow = insertWithoutReArrangeFlow;
+  }
+
   public static SortParameters createSortParameters(CarbonDataLoadConfiguration configuration) {
     SortParameters parameters = new SortParameters();
     CarbonTableIdentifier tableIdentifier =
@@ -431,10 +443,6 @@
         CarbonDataProcessorUtil.getIsVarcharColumnMapping(configuration.getDataFields()));
     parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns());
     parameters.setNumberOfNoDictSortColumns(configuration.getNumberOfNoDictSortColumns());
-    parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
-        .getNoDictSortColMapping(parameters.getCarbonTable()));
-    parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
-        .getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable()));
     parameters.setSortColumn(configuration.getSortColumnMapping());
     parameters.setObserver(new SortObserver());
     // get sort buffer size
@@ -482,18 +490,46 @@
         CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
         CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
 
-    DataType[] measureDataType = configuration.getMeasureDataType();
-    parameters.setMeasureDataType(measureDataType);
-    parameters.setNoDictDataType(CarbonDataProcessorUtil
-        .getNoDictDataTypes(configuration.getTableSpec().getCarbonTable()));
-    Map<String, DataType[]> noDictSortAndNoSortDataTypes = CarbonDataProcessorUtil
-        .getNoDictSortAndNoSortDataTypes(configuration.getTableSpec().getCarbonTable());
-    parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes"));
-    parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
-    parameters.setNoDictActualPosition(configuration.getTableSpec().getNoDictDimActualPosition());
-    parameters.setDictDimActualPosition(configuration.getTableSpec().getDictDimActualPosition());
-    parameters.setUpdateDictDims(configuration.getTableSpec().isUpdateDictDim());
-    parameters.setUpdateNonDictDims(configuration.getTableSpec().isUpdateNoDictDims());
+    if (configuration.getDataLoadProperty(DataLoadProcessorConstants.NO_REARRANGE_OF_ROWS) != null
+        && configuration.getTableSpec().getCarbonTable().getPartitionInfo() != null) {
+      // In case of partition, partition data will be present in the end for rearrange flow
+      // So, prepare the indexes and mapping as per dataFields order.
+      parameters.setInsertWithoutReArrangeFlow(true);
+      parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
+          .getNoDictSortColMappingAsDataFieldOrder(configuration.getDataFields()));
+      parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
+          .getColumnIdxBasedOnSchemaInRowAsDataFieldOrder(configuration.getDataFields()));
+      parameters.setMeasureDataType(configuration.getMeasureDataTypeAsDataFieldOrder());
+      parameters.setNoDictDataType(CarbonDataProcessorUtil
+          .getNoDictDataTypesAsDataFieldOrder(configuration.getDataFields()));
+      Map<String, DataType[]> noDictSortAndNoSortDataTypes = CarbonDataProcessorUtil
+          .getNoDictSortAndNoSortDataTypesAsDataFieldOrder(configuration.getDataFields());
+      parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes"));
+      parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
+      // keep partition columns in the end for table spec by getting rearranged tale spec
+      TableSpec tableSpec = new TableSpec(configuration.getTableSpec().getCarbonTable(), true);
+      parameters.setNoDictActualPosition(tableSpec.getNoDictDimActualPosition());
+      parameters.setDictDimActualPosition(tableSpec.getDictDimActualPosition());
+      parameters.setUpdateDictDims(tableSpec.isUpdateDictDim());
+      parameters.setUpdateNonDictDims(tableSpec.isUpdateNoDictDims());
+    } else {
+      parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
+          .getNoDictSortColMapping(parameters.getCarbonTable()));
+      parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
+          .getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable()));
+      parameters.setMeasureDataType(configuration.getMeasureDataType());
+      parameters.setNoDictDataType(CarbonDataProcessorUtil
+          .getNoDictDataTypes(configuration.getTableSpec().getCarbonTable()));
+      Map<String, DataType[]> noDictSortAndNoSortDataTypes = CarbonDataProcessorUtil
+          .getNoDictSortAndNoSortDataTypes(configuration.getTableSpec().getCarbonTable());
+      parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes"));
+      parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
+      TableSpec tableSpec = configuration.getTableSpec();
+      parameters.setNoDictActualPosition(tableSpec.getNoDictDimActualPosition());
+      parameters.setDictDimActualPosition(tableSpec.getDictDimActualPosition());
+      parameters.setUpdateDictDims(tableSpec.isUpdateDictDim());
+      parameters.setUpdateNonDictDims(tableSpec.isUpdateNoDictDims());
+    }
     return parameters;
   }
 
@@ -579,7 +615,7 @@
         .getNoDictSortColMapping(parameters.getCarbonTable()));
     parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
         .getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable()));
-    TableSpec tableSpec = new TableSpec(carbonTable);
+    TableSpec tableSpec = new TableSpec(carbonTable, false);
     parameters.setNoDictActualPosition(tableSpec.getNoDictDimActualPosition());
     parameters.setDictDimActualPosition(tableSpec.getDictDimActualPosition());
     parameters.setUpdateDictDims(tableSpec.isUpdateDictDim());
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
index 9f540a1..3ffc591 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
@@ -24,7 +24,10 @@
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.processing.sort.DummyRowUpdater;
 import org.apache.carbondata.processing.sort.SchemaBasedRowUpdater;
 import org.apache.carbondata.processing.sort.SortTempRowUpdater;
@@ -120,39 +123,68 @@
     int tmpDictNoSortCnt = 0;
     int tmpVarcharCnt = 0;
     int tmpComplexcount = 0;
+    int tmpMeasureIndex = 0;
 
-    List<CarbonDimension> allDimensions = sortParameters.getCarbonTable().getVisibleDimensions();
-    List<CarbonDimension> updatedDimensions = updateDimensionsBasedOnSortColumns(allDimensions);
-    for (int i = 0; i < updatedDimensions.size(); i++) {
-      CarbonDimension carbonDimension = updatedDimensions.get(i);
-      if (carbonDimension.getDataType() == DataTypes.DATE && !carbonDimension.isComplex()) {
-        if (carbonDimension.isSortColumn()) {
-          dictSortDimIdx[tmpDictSortCnt++] = i;
+    if (sortParameters.isInsertWithoutReArrangeFlow()
+        && sortParameters.getCarbonTable().getPartitionInfo() != null) {
+      List<ColumnSchema> reArrangedColumnSchema =
+          getReArrangedColumnSchema(sortParameters.getCarbonTable());
+      for (int i = 0; i < reArrangedColumnSchema.size(); i++) {
+        ColumnSchema columnSchema = reArrangedColumnSchema.get(i);
+        if (columnSchema.isDimensionColumn()) {
+          if (columnSchema.getDataType() == DataTypes.DATE && !columnSchema.getDataType()
+              .isComplexType()) {
+            if (columnSchema.isSortColumn()) {
+              dictSortDimIdx[tmpDictSortCnt++] = i;
+            } else {
+              dictNoSortDimIdx[tmpDictNoSortCnt++] = i;
+            }
+          } else if (!columnSchema.getDataType().isComplexType()) {
+            if (columnSchema.getDataType() == DataTypes.VARCHAR) {
+              varcharDimIdx[tmpVarcharCnt++] = i;
+            } else if (columnSchema.isSortColumn()) {
+              noDictSortDimIdx[tmpNoDictSortCnt++] = i;
+            } else {
+              noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = i;
+            }
+          } else {
+            complexDimIdx[tmpComplexcount++] = i;
+          }
         } else {
-          dictNoSortDimIdx[tmpDictNoSortCnt++] = i;
+          measureIdx[tmpMeasureIndex++] = i;
         }
-      } else if (!carbonDimension.isComplex()) {
-        if (isVarcharDimFlags[i]) {
-          varcharDimIdx[tmpVarcharCnt++] = i;
-        } else if (carbonDimension.isSortColumn()) {
-          noDictSortDimIdx[tmpNoDictSortCnt++] = i;
+      }
+    } else {
+      List<CarbonDimension> allDimensions = sortParameters.getCarbonTable().getVisibleDimensions();
+      List<CarbonDimension> updatedDimensions = updateDimensionsBasedOnSortColumns(allDimensions);
+      for (int i = 0; i < updatedDimensions.size(); i++) {
+        CarbonDimension carbonDimension = updatedDimensions.get(i);
+        if (carbonDimension.getDataType() == DataTypes.DATE && !carbonDimension.isComplex()) {
+          if (carbonDimension.isSortColumn()) {
+            dictSortDimIdx[tmpDictSortCnt++] = i;
+          } else {
+            dictNoSortDimIdx[tmpDictNoSortCnt++] = i;
+          }
+        } else if (!carbonDimension.isComplex()) {
+          if (isVarcharDimFlags[i]) {
+            varcharDimIdx[tmpVarcharCnt++] = i;
+          } else if (carbonDimension.isSortColumn()) {
+            noDictSortDimIdx[tmpNoDictSortCnt++] = i;
+          } else {
+            noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = i;
+          }
         } else {
-          noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = i;
+          complexDimIdx[tmpComplexcount++] = i;
         }
-      } else {
-        complexDimIdx[tmpComplexcount++] = i;
+      }
+      int base = updatedDimensions.size();
+      // indices for measure columns
+      for (int i = 0; i < measureCnt; i++) {
+        measureIdx[i] = base + i;
       }
     }
-
     dictNoSortDimCnt = tmpDictNoSortCnt;
     noDictNoSortDimCnt = tmpNoDictNoSortCnt;
-
-    int base = updatedDimensions.size();
-
-    // indices for measure columns
-    for (int i = 0; i < measureCnt; i++) {
-      measureIdx[i] = base + i;
-    }
     if (sortParameters.isUpdateDictDims() || sortParameters.isUpdateNonDictDims()) {
       this.sortTempRowUpdater = new SchemaBasedRowUpdater(sortParameters.getDictDimActualPosition(),
           sortParameters.getNoDictActualPosition(), sortParameters.isUpdateDictDims(),
@@ -295,4 +327,29 @@
     updatedDataFields.addAll(nonSortFields);
     return updatedDataFields;
   }
+
+  private static List<ColumnSchema> getReArrangedColumnSchema(
+      CarbonTable carbonTable) {
+    // handle 1.1 compatibility for sort columns
+    List<CarbonDimension> visibleDimensions =
+        updateDimensionsBasedOnSortColumns(carbonTable.getVisibleDimensions());
+    List<CarbonMeasure> visibleMeasures = carbonTable.getVisibleMeasures();
+    List<ColumnSchema> otherCols = new ArrayList<>();
+    if (carbonTable.getPartitionInfo() != null) {
+      List<ColumnSchema> columnSchemaList = carbonTable.getPartitionInfo().getColumnSchemaList();
+      for (CarbonDimension dim : visibleDimensions) {
+        if (!columnSchemaList.contains(dim.getColumnSchema())) {
+          otherCols.add(dim.getColumnSchema());
+        }
+      }
+      for (CarbonMeasure measure : visibleMeasures) {
+        if (!columnSchemaList.contains(measure.getColumnSchema())) {
+          otherCols.add(measure.getColumnSchema());
+        }
+      }
+      otherCols.addAll(columnSchemaList);
+    }
+    return otherCols;
+  }
+
 }
\ No newline at end of file
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index a95bf15..5ab3061 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -304,7 +304,7 @@
     carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
     carbonFactDataHandlerModel.setColumnCompressor(loadModel.getColumnCompressor());
 
-    carbonFactDataHandlerModel.tableSpec = new TableSpec(carbonTable);
+    carbonFactDataHandlerModel.tableSpec = new TableSpec(carbonTable, false);
     DataMapWriterListener listener = new DataMapWriterListener();
     listener.registerAllWriter(
         carbonTable,
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index c159d18..a3fbb0d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -376,6 +376,25 @@
   }
 
   /**
+   * get visible no dictionary dimensions as per data field order
+   *
+   * @param dataFields
+   * @return
+   */
+  public static DataType[] getNoDictDataTypesAsDataFieldOrder(DataField[] dataFields) {
+    List<DataType> type = new ArrayList<>();
+    for (DataField dataField : dataFields) {
+      if (!dataField.getColumn().isInvisible() && dataField.getColumn().isDimension()) {
+        if (dataField.getColumn().getColumnSchema().isSortColumn()
+            && dataField.getColumn().getColumnSchema().getDataType() != DataTypes.DATE) {
+          type.add(dataField.getColumn().getColumnSchema().getDataType());
+        }
+      }
+    }
+    return type.toArray(new DataType[type.size()]);
+  }
+
+  /**
    * Get the no dictionary sort column mapping of the table
    *
    * @param carbonTable
@@ -402,6 +421,33 @@
   }
 
   /**
+   * get mapping based on data fields order
+   *
+   * @param dataFields
+   * @return
+   */
+  public static boolean[] getNoDictSortColMappingAsDataFieldOrder(DataField[] dataFields) {
+    List<Boolean> noDicSortColMap = new ArrayList<>();
+    for (DataField dataField : dataFields) {
+      if (!dataField.getColumn().isInvisible() && dataField.getColumn().isDimension()) {
+        if (dataField.getColumn().getColumnSchema().isSortColumn()) {
+          if (dataField.getColumn().getColumnSchema().getDataType() != DataTypes.DATE) {
+            noDicSortColMap.add(true);
+          } else {
+            noDicSortColMap.add(false);
+          }
+        }
+      }
+    }
+    Boolean[] mapping = noDicSortColMap.toArray(new Boolean[0]);
+    boolean[] noDicSortColMapping = new boolean[mapping.length];
+    for (int i = 0; i < mapping.length; i++) {
+      noDicSortColMapping[i] = mapping[i];
+    }
+    return noDicSortColMapping;
+  }
+
+  /**
    * If the dimension is added in older version 1.1, by default it will be sort column, So during
    * initial sorting, carbonrow will be in order where added sort column is at the beginning, But
    * before final merger of sort, the data should be in schema order
@@ -431,6 +477,37 @@
   }
 
   /**
+   * If the dimension is added in older version 1.1, by default it will be sort column, So during
+   * initial sorting, carbonrow will be in order where added sort column is at the beginning, But
+   * before final merger of sort, the data should be in schema order
+   * (org.apache.carbondata.processing.sort.SchemaBasedRowUpdater updates the carbonRow in schema
+   * order), so This method helps to find the index of no dictionary sort column in the carbonrow
+   * data.
+   */
+  public static int[] getColumnIdxBasedOnSchemaInRowAsDataFieldOrder(DataField[] dataFields) {
+    List<Integer> noDicSortColMap = new ArrayList<>();
+    int counter = 0;
+    for (DataField dataField : dataFields) {
+      if (!dataField.getColumn().isInvisible() && dataField.getColumn().isDimension()) {
+        if (dataField.getColumn().getColumnSchema().getDataType() == DataTypes.DATE) {
+          continue;
+        }
+        if (dataField.getColumn().getColumnSchema().isSortColumn() && DataTypeUtil
+            .isPrimitiveColumn(dataField.getColumn().getColumnSchema().getDataType())) {
+          noDicSortColMap.add(counter);
+        }
+        counter++;
+      }
+    }
+    Integer[] mapping = noDicSortColMap.toArray(new Integer[0]);
+    int[] columnIdxBasedOnSchemaInRow = new int[mapping.length];
+    for (int i = 0; i < mapping.length; i++) {
+      columnIdxBasedOnSchemaInRow[i] = mapping[i];
+    }
+    return columnIdxBasedOnSchemaInRow;
+  }
+
+  /**
    * Get the data types of the no dictionary sort columns
    *
    * @param carbonTable
@@ -458,6 +535,34 @@
   }
 
   /**
+   * Get the data types of the no dictionary sort columns as per dataFields order
+   *
+   * @param dataFields
+   * @return
+   */
+  public static Map<String, DataType[]> getNoDictSortAndNoSortDataTypesAsDataFieldOrder(
+      DataField[] dataFields) {
+    List<DataType> noDictSortType = new ArrayList<>();
+    List<DataType> noDictNoSortType = new ArrayList<>();
+    for (DataField dataField : dataFields) {
+      if (dataField.getColumn().isDimension()
+          && dataField.getColumn().getColumnSchema().getDataType() != DataTypes.DATE) {
+        if (dataField.getColumn().getColumnSchema().isSortColumn()) {
+          noDictSortType.add(dataField.getColumn().getColumnSchema().getDataType());
+        } else {
+          noDictNoSortType.add(dataField.getColumn().getColumnSchema().getDataType());
+        }
+      }
+    }
+    DataType[] noDictSortTypes = noDictSortType.toArray(new DataType[noDictSortType.size()]);
+    DataType[] noDictNoSortTypes = noDictNoSortType.toArray(new DataType[noDictNoSortType.size()]);
+    Map<String, DataType[]> noDictSortAndNoSortTypes = new HashMap<>(2);
+    noDictSortAndNoSortTypes.put("noDictSortDataTypes", noDictSortTypes);
+    noDictSortAndNoSortTypes.put("noDictNoSortDataTypes", noDictNoSortTypes);
+    return noDictSortAndNoSortTypes;
+  }
+
+  /**
    * This method will get the store location for the given path, segment id and partition id
    *
    * @return data directory path