[CARBONDATA-3524] Support global_sort compaction

[Backgroud]

For GLOBAL_SORT table, now the segments will be compact in LOCAL_SORT.

[Motivation]

After compaction, maybe it will impact query performance. Better to use GLABOL_SORT compaction to improve the performance.

[Limitation]

Range_Column still use old flow to load data
only support standard table

This closes #3389
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
index bad8bdc..ed0f466 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
@@ -42,7 +42,7 @@
       """
         | CREATE TABLE compaction_globalsort(id INT, name STRING, city STRING, age INT)
         | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT', 'GLOBAL_SORT_PARTITIONS'='1')
       """.stripMargin)
 
     sql("DROP TABLE IF EXISTS carbon_localsort")
@@ -68,6 +68,7 @@
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
     sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
 
+    sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
     sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
 
     checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
@@ -108,6 +109,7 @@
 
     checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
 
+    sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
     sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
 
     checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
@@ -138,6 +140,7 @@
 
     checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
 
+    sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
     sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
 
     checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
@@ -174,6 +177,7 @@
 
     checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
 
+    sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
     sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
 
     val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
@@ -206,6 +210,7 @@
 
     checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
 
+    sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
     sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
 
     checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
@@ -245,6 +250,7 @@
 
     checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
 
+    sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
     sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
     sql("clean files for table compaction_globalsort")
 
@@ -279,7 +285,7 @@
     checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
 
     checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
-
+    sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
     sql("ALTER TABLE compaction_globalsort COMPACT 'minor'")
     sql("clean files for table compaction_globalsort")
 
@@ -312,6 +318,7 @@
 
     checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
 
+    sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
     sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
     sql("clean files for table compaction_globalsort")
 
@@ -347,6 +354,7 @@
 
     checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
 
+    sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='2')")
     sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
     sql("clean files for table compaction_globalsort")
 
@@ -378,7 +386,7 @@
     checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
 
     checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
-
+    sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='2')")
     sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
     sql("clean files for table compaction_globalsort")
     checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
@@ -438,6 +446,7 @@
       sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort")
       sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE compaction_globalsort")
     }
+    sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='2')")
     sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
 
     assert(getIndexFileCount("compaction_globalsort", "0.1") === 2)
@@ -453,6 +462,7 @@
       sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort")
       sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE compaction_globalsort")
     }
+    sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='2')")
     sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
 
     assert(getIndexFileCount("compaction_globalsort", "0.1") === 2)
@@ -479,6 +489,7 @@
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
     sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
 
+    sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
     sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
     sql("clean files for table compaction_globalsort")
 
@@ -503,7 +514,7 @@
       """
         | CREATE TABLE compaction_globalsort2(id INT, name STRING, city STRING, age INT)
         | STORED BY 'org.apache.carbondata.format'
-        |  TBLPROPERTIES('SORT_COLUMNS'='id','SORT_SCOPE'='GLOBAL_SORT')
+        |  TBLPROPERTIES('SORT_COLUMNS'='id','SORT_SCOPE'='GLOBAL_SORT', 'global_sort_partitions'='1')
       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort2")
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort2")
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
index 4b027b9..a1d0fe0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
@@ -43,7 +43,7 @@
       """
         | CREATE TABLE compaction_globalsort(id INT, name STRING, city STRING, age INT)
         | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT', 'GLOBAL_SORT_PARTITIONS'='1')
       """.stripMargin)
 
     sql("DROP TABLE IF EXISTS carbon_localsort")
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 5eeee78..255f399 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -85,6 +85,9 @@
     sql("DROP TABLE IF EXISTS carbon_globalsort2")
     sql("DROP TABLE IF EXISTS carbon_globalsort_partitioned")
     sql("DROP TABLE IF EXISTS carbon_globalsort_difftypes")
+    sql("DROP TABLE IF EXISTS carbon_globalsort_minor")
+    sql("DROP TABLE IF EXISTS carbon_globalsort_major")
+    sql("DROP TABLE IF EXISTS carbon_globalsort_custom")
   }
 
   // ----------------------------------- Compare Result -----------------------------------
@@ -183,6 +186,95 @@
       sql("SELECT * FROM carbon_localsort_twice ORDER BY name, id"))
   }
 
+  test("Compaction GLOBAL_SORT: minor") {
+    sql("DROP TABLE IF EXISTS carbon_globalsort_minor")
+    sql(
+      """
+        | CREATE TABLE carbon_globalsort_minor(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES(
+        | 'SORT_SCOPE'='GLOBAL_SORT',
+        | 'sort_columns' = 'name, city',
+        | 'AUTO_LOAD_MERGE'='false',
+        | 'COMPACTION_LEVEL_THRESHOLD'='3,0', 'GLOBAL_SORT_PARTITIONS'='3')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_minor")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_minor")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_minor")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_minor")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+    assertResult(4)(sql("show segments for table carbon_globalsort_minor").count())
+    sql("ALTER TABLE carbon_globalsort_minor COMPACT 'MINOR'")
+    assertResult(5)(sql("show segments for table carbon_globalsort_minor").count())
+    assertResult(3)(
+      sql("show segments for table carbon_globalsort_minor").rdd.filter(_.get(1).equals("Compacted")).count())
+    assert(getIndexFileCount("carbon_globalsort_minor", "0.1") === 3)
+    checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort_minor"), Seq(Row(48)))
+    checkAnswer(sql("SELECT * FROM carbon_globalsort_minor ORDER BY name, id"),
+      sql("SELECT * FROM carbon_globalsort ORDER BY name, id"))
+  }
+
+  test("Compaction GLOBAL_SORT: major") {
+    sql("DROP TABLE IF EXISTS carbon_globalsort_major")
+    sql(
+      """
+        | CREATE TABLE carbon_globalsort_major(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES(
+        | 'SORT_SCOPE'='GLOBAL_SORT',
+        | 'sort_columns' = 'name, city',
+        | 'AUTO_LOAD_MERGE'='false',
+        | 'MAJOR_COMPACTION_SIZE'='1024', 'GLOBAL_SORT_PARTITIONS'='4')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_major")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_major")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_major")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_major")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+    assertResult(4)(sql("show segments for table carbon_globalsort_major").count())
+    sql("ALTER TABLE carbon_globalsort_major COMPACT 'major'")
+    assertResult(5)(sql("show segments for table carbon_globalsort_major").count())
+    assertResult(4)(
+      sql("show segments for table carbon_globalsort_major").rdd.filter(_.get(1).equals("Compacted")).count())
+    assert(getIndexFileCount("carbon_globalsort_major", "0.1") === 4)
+    checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort_major"), Seq(Row(48)))
+    checkAnswer(sql("SELECT * FROM carbon_globalsort_major ORDER BY name, id"),
+      sql("SELECT * FROM carbon_globalsort ORDER BY name, id"))
+  }
+
+  test("Compaction GLOBAL_SORT: custom") {
+    sql("DROP TABLE IF EXISTS carbon_globalsort_custom")
+    sql(
+      """
+        | CREATE TABLE carbon_globalsort_custom(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES(
+        | 'SORT_SCOPE'='GLOBAL_SORT',
+        | 'sort_columns' = 'name, city',
+        | 'AUTO_LOAD_MERGE'='false', 'GLOBAL_SORT_PARTITIONS'='3')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_custom")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_custom")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_custom")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_custom")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+    sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+    assertResult(4)(sql("show segments for table carbon_globalsort_custom").count())
+    sql("ALTER TABLE carbon_globalsort_custom COMPACT 'custom' WHERE SEGMENT.ID IN (0,1,2)")
+    assertResult(5)(sql("show segments for table carbon_globalsort_custom").count())
+    assertResult(3)(
+      sql("show segments for table carbon_globalsort_custom").rdd.filter(_.get(1).equals("Compacted")).count())
+    assert(getIndexFileCount("carbon_globalsort_custom", "0.1") === 3)
+    checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort_custom"), Seq(Row(48)))
+    checkAnswer(sql("SELECT * FROM carbon_globalsort_custom ORDER BY name, id"),
+      sql("SELECT * FROM carbon_globalsort_custom ORDER BY name, id"))
+  }
+
   // ----------------------------------- Check Configurations -----------------------------------
   // Waiting for merge SET feature[CARBONDATA-1065]
   ignore("DDL > SET") {
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala
index 2ea3d43..100ad17 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala
@@ -29,7 +29,7 @@
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension, ColumnSchema}
 
-private[spark] object SparkTypeConverter {
+object SparkTypeConverter {
 
   def createSparkSchema(table: CarbonTable, columns: Seq[String]): StructType = {
     Objects.requireNonNull(table)
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 77b7119..bfbf52d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -24,21 +24,39 @@
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-import org.apache.spark.sql.SQLContext
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.{InputSplit, Job}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel}
+import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
 import org.apache.spark.util.MergeIndexUtil
+import org.apache.spark.CarbonInputMetrics
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.GenericRow
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
 import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.row.CarbonRow
+import org.apache.carbondata.core.metadata.datatype.{StructField, StructType}
 import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.events._
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat, CarbonTableOutputFormat}
+import org.apache.carbondata.hadoop.CarbonProjection
+import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
+import org.apache.carbondata.processing.util.TableOptionConstant
+import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
 import org.apache.carbondata.spark.MergeResultImpl
+import org.apache.carbondata.store.CarbonRowReadSupport
 
 /**
  * This class is used to perform compaction on carbon table.
@@ -195,6 +213,11 @@
           carbonLoadModel,
           carbonMergerMapping
         ).collect
+      } else if (SortScope.GLOBAL_SORT == carbonTable.getSortScope &&
+                 !carbonTable.getSortColumns.isEmpty &&
+                 carbonTable.getRangeColumn == null &&
+                 CarbonUtil.isStandardCarbonTable(carbonTable)) {
+        compactSegmentsByGlobalSort(sc.sparkSession, carbonLoadModel, carbonMergerMapping)
       } else {
         new CarbonMergerRDD(
           sc.sparkSession,
@@ -325,4 +348,117 @@
     }
   }
 
+  /**
+   * compact segments by global sort
+   */
+  def compactSegmentsByGlobalSort(
+      sparkSession: SparkSession,
+      carbonLoadModel: CarbonLoadModel,
+      carbonMergerMapping: CarbonMergerMapping): Array[(String, Boolean)] = {
+    val dataFrame = dataFrameOfSegments(
+      sparkSession,
+      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+      carbonMergerMapping.validSegments)
+    // generate LoadModel which can be used global_sort flow
+    val outputModel = getLoadModelForGlobalSort(
+      sparkSession, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+      carbonMergerMapping.validSegments)
+    outputModel.setSegmentId(carbonMergerMapping.mergedLoadName.split("_")(1))
+    DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
+      sparkSession,
+      Option(dataFrame),
+      outputModel,
+      SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
+      .map { row =>
+        (row._1, FailureCauses.NONE == row._2._2.failureCauses)
+      }
+  }
+
+  /**
+   * create DataFrame basing on specified segments
+   */
+  def dataFrameOfSegments(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      segments: Array[Segment]
+  ): DataFrame = {
+    val columns = carbonTable
+      .getCreateOrderColumn(carbonTable.getTableName)
+      .asScala
+      .map(_.getColName)
+      .toArray
+    val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
+    val rdd: RDD[Row] = new CarbonScanRDD[CarbonRow](
+      sparkSession,
+      columnProjection = new CarbonProjection(columns),
+      null,
+      carbonTable.getAbsoluteTableIdentifier,
+      carbonTable.getTableInfo.serialize,
+      carbonTable.getTableInfo,
+      new CarbonInputMetrics,
+      null,
+      null,
+      classOf[CarbonRowReadSupport],
+      splitsOfSegments(sparkSession, carbonTable, segments))
+      .map { row =>
+        new GenericRow(row.getData.asInstanceOf[Array[Any]])
+      }
+    sparkSession.createDataFrame(rdd, schema)
+  }
+
+  /**
+   * get splits of specified segments
+   */
+  def splitsOfSegments(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      segments: Array[Segment]
+  ): java.util.List[InputSplit] = {
+    val jobConf = new JobConf(SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    val job = Job.getInstance(jobConf)
+    val conf = job.getConfiguration
+    CarbonInputFormat.setTablePath(conf, carbonTable.getTablePath)
+    CarbonInputFormat.setTableInfo(conf, carbonTable.getTableInfo)
+    CarbonInputFormat.setDatabaseName(conf, carbonTable.getDatabaseName)
+    CarbonInputFormat.setTableName(conf, carbonTable.getTableName)
+    CarbonInputFormat.setQuerySegment(conf, segments.map(_.getSegmentNo).mkString(","))
+    new CarbonTableInputFormat[Object].getSplits(job)
+  }
+
+  /**
+   * create CarbonLoadModel for global_sort compaction
+   */
+  def getLoadModelForGlobalSort(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      segments: Array[Segment]
+  ): CarbonLoadModel = {
+    val conf = SparkSQLUtil.sessionState(sparkSession).newHadoopConf()
+    CarbonTableOutputFormat.setDatabaseName(conf, carbonTable.getDatabaseName)
+    CarbonTableOutputFormat.setTableName(conf, carbonTable.getTableName)
+    CarbonTableOutputFormat.setCarbonTable(conf, carbonTable)
+    val fieldList = carbonTable
+      .getCreateOrderColumn(carbonTable.getTableName)
+      .asScala
+      .map { column =>
+        new StructField(column.getColName, column.getDataType)
+      }
+    CarbonTableOutputFormat.setInputSchema(conf, new StructType(fieldList.asJava))
+    val loadModel = CarbonTableOutputFormat.getLoadModel(conf)
+    loadModel.setSerializationNullFormat(
+      TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + ",\\N")
+    loadModel.setBadRecordsLoggerEnable(
+      TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + ",false")
+    loadModel.setBadRecordsAction(
+      TableOptionConstant.BAD_RECORDS_ACTION.getName() + ",force")
+    loadModel.setIsEmptyDataBadRecord(
+      DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + ",false")
+    val globalSortPartitions =
+      carbonTable.getTableInfo.getFactTable.getTableProperties.get("global_sort_partitions")
+    if (globalSortPartitions != null) {
+      loadModel.setGlobalSortPartitions(globalSortPartitions)
+    }
+    loadModel
+  }
 }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 99bc863..5e7039f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -518,7 +518,8 @@
       "LOAD_MIN_SIZE_INMB",
       "RANGE_COLUMN",
       "SORT_SCOPE",
-      "SORT_COLUMNS")
+      "SORT_COLUMNS",
+      "GLOBAL_SORT_PARTITIONS")
     supportedOptions.contains(propKey.toUpperCase)
   }