[CARBONDATA-3524] Support global_sort compaction
[Backgroud]
For GLOBAL_SORT table, now the segments will be compact in LOCAL_SORT.
[Motivation]
After compaction, maybe it will impact query performance. Better to use GLABOL_SORT compaction to improve the performance.
[Limitation]
Range_Column still use old flow to load data
only support standard table
This closes #3389
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
index bad8bdc..ed0f466 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
@@ -42,7 +42,7 @@
"""
| CREATE TABLE compaction_globalsort(id INT, name STRING, city STRING, age INT)
| STORED BY 'org.apache.carbondata.format'
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT', 'GLOBAL_SORT_PARTITIONS'='1')
""".stripMargin)
sql("DROP TABLE IF EXISTS carbon_localsort")
@@ -68,6 +68,7 @@
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+ sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
@@ -108,6 +109,7 @@
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+ sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
@@ -138,6 +140,7 @@
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+ sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
@@ -174,6 +177,7 @@
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+ sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
@@ -206,6 +210,7 @@
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+ sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
@@ -245,6 +250,7 @@
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+ sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
sql("clean files for table compaction_globalsort")
@@ -279,7 +285,7 @@
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
-
+ sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
sql("ALTER TABLE compaction_globalsort COMPACT 'minor'")
sql("clean files for table compaction_globalsort")
@@ -312,6 +318,7 @@
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+ sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
sql("clean files for table compaction_globalsort")
@@ -347,6 +354,7 @@
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+ sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='2')")
sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
sql("clean files for table compaction_globalsort")
@@ -378,7 +386,7 @@
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
-
+ sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='2')")
sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
sql("clean files for table compaction_globalsort")
checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
@@ -438,6 +446,7 @@
sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE compaction_globalsort")
}
+ sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='2')")
sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
assert(getIndexFileCount("compaction_globalsort", "0.1") === 2)
@@ -453,6 +462,7 @@
sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort")
sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE compaction_globalsort")
}
+ sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='2')")
sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
assert(getIndexFileCount("compaction_globalsort", "0.1") === 2)
@@ -479,6 +489,7 @@
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+ sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')")
sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
sql("clean files for table compaction_globalsort")
@@ -503,7 +514,7 @@
"""
| CREATE TABLE compaction_globalsort2(id INT, name STRING, city STRING, age INT)
| STORED BY 'org.apache.carbondata.format'
- | TBLPROPERTIES('SORT_COLUMNS'='id','SORT_SCOPE'='GLOBAL_SORT')
+ | TBLPROPERTIES('SORT_COLUMNS'='id','SORT_SCOPE'='GLOBAL_SORT', 'global_sort_partitions'='1')
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort2")
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort2")
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
index 4b027b9..a1d0fe0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
@@ -43,7 +43,7 @@
"""
| CREATE TABLE compaction_globalsort(id INT, name STRING, city STRING, age INT)
| STORED BY 'org.apache.carbondata.format'
- | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT', 'GLOBAL_SORT_PARTITIONS'='1')
""".stripMargin)
sql("DROP TABLE IF EXISTS carbon_localsort")
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 5eeee78..255f399 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -85,6 +85,9 @@
sql("DROP TABLE IF EXISTS carbon_globalsort2")
sql("DROP TABLE IF EXISTS carbon_globalsort_partitioned")
sql("DROP TABLE IF EXISTS carbon_globalsort_difftypes")
+ sql("DROP TABLE IF EXISTS carbon_globalsort_minor")
+ sql("DROP TABLE IF EXISTS carbon_globalsort_major")
+ sql("DROP TABLE IF EXISTS carbon_globalsort_custom")
}
// ----------------------------------- Compare Result -----------------------------------
@@ -183,6 +186,95 @@
sql("SELECT * FROM carbon_localsort_twice ORDER BY name, id"))
}
+ test("Compaction GLOBAL_SORT: minor") {
+ sql("DROP TABLE IF EXISTS carbon_globalsort_minor")
+ sql(
+ """
+ | CREATE TABLE carbon_globalsort_minor(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES(
+ | 'SORT_SCOPE'='GLOBAL_SORT',
+ | 'sort_columns' = 'name, city',
+ | 'AUTO_LOAD_MERGE'='false',
+ | 'COMPACTION_LEVEL_THRESHOLD'='3,0', 'GLOBAL_SORT_PARTITIONS'='3')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_minor")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_minor")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_minor")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_minor")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+ assertResult(4)(sql("show segments for table carbon_globalsort_minor").count())
+ sql("ALTER TABLE carbon_globalsort_minor COMPACT 'MINOR'")
+ assertResult(5)(sql("show segments for table carbon_globalsort_minor").count())
+ assertResult(3)(
+ sql("show segments for table carbon_globalsort_minor").rdd.filter(_.get(1).equals("Compacted")).count())
+ assert(getIndexFileCount("carbon_globalsort_minor", "0.1") === 3)
+ checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort_minor"), Seq(Row(48)))
+ checkAnswer(sql("SELECT * FROM carbon_globalsort_minor ORDER BY name, id"),
+ sql("SELECT * FROM carbon_globalsort ORDER BY name, id"))
+ }
+
+ test("Compaction GLOBAL_SORT: major") {
+ sql("DROP TABLE IF EXISTS carbon_globalsort_major")
+ sql(
+ """
+ | CREATE TABLE carbon_globalsort_major(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES(
+ | 'SORT_SCOPE'='GLOBAL_SORT',
+ | 'sort_columns' = 'name, city',
+ | 'AUTO_LOAD_MERGE'='false',
+ | 'MAJOR_COMPACTION_SIZE'='1024', 'GLOBAL_SORT_PARTITIONS'='4')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_major")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_major")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_major")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_major")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+ assertResult(4)(sql("show segments for table carbon_globalsort_major").count())
+ sql("ALTER TABLE carbon_globalsort_major COMPACT 'major'")
+ assertResult(5)(sql("show segments for table carbon_globalsort_major").count())
+ assertResult(4)(
+ sql("show segments for table carbon_globalsort_major").rdd.filter(_.get(1).equals("Compacted")).count())
+ assert(getIndexFileCount("carbon_globalsort_major", "0.1") === 4)
+ checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort_major"), Seq(Row(48)))
+ checkAnswer(sql("SELECT * FROM carbon_globalsort_major ORDER BY name, id"),
+ sql("SELECT * FROM carbon_globalsort ORDER BY name, id"))
+ }
+
+ test("Compaction GLOBAL_SORT: custom") {
+ sql("DROP TABLE IF EXISTS carbon_globalsort_custom")
+ sql(
+ """
+ | CREATE TABLE carbon_globalsort_custom(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES(
+ | 'SORT_SCOPE'='GLOBAL_SORT',
+ | 'sort_columns' = 'name, city',
+ | 'AUTO_LOAD_MERGE'='false', 'GLOBAL_SORT_PARTITIONS'='3')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_custom")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_custom")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_custom")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_custom")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort")
+ assertResult(4)(sql("show segments for table carbon_globalsort_custom").count())
+ sql("ALTER TABLE carbon_globalsort_custom COMPACT 'custom' WHERE SEGMENT.ID IN (0,1,2)")
+ assertResult(5)(sql("show segments for table carbon_globalsort_custom").count())
+ assertResult(3)(
+ sql("show segments for table carbon_globalsort_custom").rdd.filter(_.get(1).equals("Compacted")).count())
+ assert(getIndexFileCount("carbon_globalsort_custom", "0.1") === 3)
+ checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort_custom"), Seq(Row(48)))
+ checkAnswer(sql("SELECT * FROM carbon_globalsort_custom ORDER BY name, id"),
+ sql("SELECT * FROM carbon_globalsort_custom ORDER BY name, id"))
+ }
+
// ----------------------------------- Check Configurations -----------------------------------
// Waiting for merge SET feature[CARBONDATA-1065]
ignore("DDL > SET") {
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala
index 2ea3d43..100ad17 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala
@@ -29,7 +29,7 @@
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension, ColumnSchema}
-private[spark] object SparkTypeConverter {
+object SparkTypeConverter {
def createSparkSchema(table: CarbonTable, columns: Seq[String]): StructType = {
Objects.requireNonNull(table)
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 77b7119..bfbf52d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -24,21 +24,39 @@
import scala.collection.JavaConverters._
import scala.collection.mutable
-import org.apache.spark.sql.SQLContext
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.{InputSplit, Job}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel}
+import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
import org.apache.spark.util.MergeIndexUtil
+import org.apache.spark.CarbonInputMetrics
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.datastore.row.CarbonRow
+import org.apache.carbondata.core.metadata.datatype.{StructField, StructType}
import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.events._
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat, CarbonTableOutputFormat}
+import org.apache.carbondata.hadoop.CarbonProjection
+import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
+import org.apache.carbondata.processing.util.TableOptionConstant
+import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
import org.apache.carbondata.spark.MergeResultImpl
+import org.apache.carbondata.store.CarbonRowReadSupport
/**
* This class is used to perform compaction on carbon table.
@@ -195,6 +213,11 @@
carbonLoadModel,
carbonMergerMapping
).collect
+ } else if (SortScope.GLOBAL_SORT == carbonTable.getSortScope &&
+ !carbonTable.getSortColumns.isEmpty &&
+ carbonTable.getRangeColumn == null &&
+ CarbonUtil.isStandardCarbonTable(carbonTable)) {
+ compactSegmentsByGlobalSort(sc.sparkSession, carbonLoadModel, carbonMergerMapping)
} else {
new CarbonMergerRDD(
sc.sparkSession,
@@ -325,4 +348,117 @@
}
}
+ /**
+ * compact segments by global sort
+ */
+ def compactSegmentsByGlobalSort(
+ sparkSession: SparkSession,
+ carbonLoadModel: CarbonLoadModel,
+ carbonMergerMapping: CarbonMergerMapping): Array[(String, Boolean)] = {
+ val dataFrame = dataFrameOfSegments(
+ sparkSession,
+ carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+ carbonMergerMapping.validSegments)
+ // generate LoadModel which can be used global_sort flow
+ val outputModel = getLoadModelForGlobalSort(
+ sparkSession, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+ carbonMergerMapping.validSegments)
+ outputModel.setSegmentId(carbonMergerMapping.mergedLoadName.split("_")(1))
+ DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
+ sparkSession,
+ Option(dataFrame),
+ outputModel,
+ SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
+ .map { row =>
+ (row._1, FailureCauses.NONE == row._2._2.failureCauses)
+ }
+ }
+
+ /**
+ * create DataFrame basing on specified segments
+ */
+ def dataFrameOfSegments(
+ sparkSession: SparkSession,
+ carbonTable: CarbonTable,
+ segments: Array[Segment]
+ ): DataFrame = {
+ val columns = carbonTable
+ .getCreateOrderColumn(carbonTable.getTableName)
+ .asScala
+ .map(_.getColName)
+ .toArray
+ val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
+ val rdd: RDD[Row] = new CarbonScanRDD[CarbonRow](
+ sparkSession,
+ columnProjection = new CarbonProjection(columns),
+ null,
+ carbonTable.getAbsoluteTableIdentifier,
+ carbonTable.getTableInfo.serialize,
+ carbonTable.getTableInfo,
+ new CarbonInputMetrics,
+ null,
+ null,
+ classOf[CarbonRowReadSupport],
+ splitsOfSegments(sparkSession, carbonTable, segments))
+ .map { row =>
+ new GenericRow(row.getData.asInstanceOf[Array[Any]])
+ }
+ sparkSession.createDataFrame(rdd, schema)
+ }
+
+ /**
+ * get splits of specified segments
+ */
+ def splitsOfSegments(
+ sparkSession: SparkSession,
+ carbonTable: CarbonTable,
+ segments: Array[Segment]
+ ): java.util.List[InputSplit] = {
+ val jobConf = new JobConf(SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
+ SparkHadoopUtil.get.addCredentials(jobConf)
+ val job = Job.getInstance(jobConf)
+ val conf = job.getConfiguration
+ CarbonInputFormat.setTablePath(conf, carbonTable.getTablePath)
+ CarbonInputFormat.setTableInfo(conf, carbonTable.getTableInfo)
+ CarbonInputFormat.setDatabaseName(conf, carbonTable.getDatabaseName)
+ CarbonInputFormat.setTableName(conf, carbonTable.getTableName)
+ CarbonInputFormat.setQuerySegment(conf, segments.map(_.getSegmentNo).mkString(","))
+ new CarbonTableInputFormat[Object].getSplits(job)
+ }
+
+ /**
+ * create CarbonLoadModel for global_sort compaction
+ */
+ def getLoadModelForGlobalSort(
+ sparkSession: SparkSession,
+ carbonTable: CarbonTable,
+ segments: Array[Segment]
+ ): CarbonLoadModel = {
+ val conf = SparkSQLUtil.sessionState(sparkSession).newHadoopConf()
+ CarbonTableOutputFormat.setDatabaseName(conf, carbonTable.getDatabaseName)
+ CarbonTableOutputFormat.setTableName(conf, carbonTable.getTableName)
+ CarbonTableOutputFormat.setCarbonTable(conf, carbonTable)
+ val fieldList = carbonTable
+ .getCreateOrderColumn(carbonTable.getTableName)
+ .asScala
+ .map { column =>
+ new StructField(column.getColName, column.getDataType)
+ }
+ CarbonTableOutputFormat.setInputSchema(conf, new StructType(fieldList.asJava))
+ val loadModel = CarbonTableOutputFormat.getLoadModel(conf)
+ loadModel.setSerializationNullFormat(
+ TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + ",\\N")
+ loadModel.setBadRecordsLoggerEnable(
+ TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + ",false")
+ loadModel.setBadRecordsAction(
+ TableOptionConstant.BAD_RECORDS_ACTION.getName() + ",force")
+ loadModel.setIsEmptyDataBadRecord(
+ DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + ",false")
+ val globalSortPartitions =
+ carbonTable.getTableInfo.getFactTable.getTableProperties.get("global_sort_partitions")
+ if (globalSortPartitions != null) {
+ loadModel.setGlobalSortPartitions(globalSortPartitions)
+ }
+ loadModel
+ }
}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 99bc863..5e7039f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -518,7 +518,8 @@
"LOAD_MIN_SIZE_INMB",
"RANGE_COLUMN",
"SORT_SCOPE",
- "SORT_COLUMNS")
+ "SORT_COLUMNS",
+ "GLOBAL_SORT_PARTITIONS")
supportedOptions.contains(propKey.toUpperCase)
}