[CARBONDATA-1797] Segment_Index compaction should take compaction lock to support concurrent scenarios better
SEGMENT_INDEX compaction is not taking compaction lock. While concurrent operation, compaction may be successful but the output may not be as expected.
This closes #1553
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 0619851..284587d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -95,6 +95,16 @@
LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
s".${ carbonLoadModel.getTableName }")
try {
+ if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
+ // Just launch job to merge index and return
+ CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
+ CarbonDataMergerUtil.getValidSegmentList(
+ carbonTable.getAbsoluteTableIdentifier).asScala,
+ carbonLoadModel.getTablePath,
+ carbonTable, true)
+ lock.unlock()
+ return
+ }
startCompactionThreads(sqlContext,
carbonLoadModel,
storeLocation,
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
index 6e11fe4..2cd771c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
@@ -139,14 +139,6 @@
if (null == carbonLoadModel.getLoadMetadataDetails) {
CommonUtil.readLoadMetadataDetails(carbonLoadModel)
}
- if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
- // Just launch job to merge index and return
- CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
- CarbonDataMergerUtil.getValidSegmentList(carbonTable.getAbsoluteTableIdentifier).asScala,
- carbonLoadModel.getTablePath,
- carbonTable, true)
- return
- }
// reading the start time of data load.
val loadStartTime : Long =
if (alterTableModel.factTimeStamp.isEmpty) {
@@ -192,6 +184,16 @@
LOGGER.info("Acquired the compaction lock for table" +
s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
+ if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
+ // Just launch job to merge index and return
+ CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
+ CarbonDataMergerUtil.getValidSegmentList(
+ carbonTable.getAbsoluteTableIdentifier).asScala,
+ carbonLoadModel.getTablePath,
+ carbonTable, true)
+ lock.unlock()
+ return
+ }
CarbonDataRDDFactory.startCompactionThreads(sqlContext,
carbonLoadModel,
storeLocation,