[CARBONDATA-1797] Segment_Index compaction should take compaction lock to support concurrent scenarios better

SEGMENT_INDEX compaction is not taking compaction lock. While concurrent operation, compaction may be successful but the output may not be as expected.

This closes #1553
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 0619851..284587d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -95,6 +95,16 @@
       LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
           s".${ carbonLoadModel.getTableName }")
       try {
+        if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
+          // Just launch job to merge index and return
+          CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
+            CarbonDataMergerUtil.getValidSegmentList(
+              carbonTable.getAbsoluteTableIdentifier).asScala,
+            carbonLoadModel.getTablePath,
+            carbonTable, true)
+          lock.unlock()
+          return
+        }
         startCompactionThreads(sqlContext,
           carbonLoadModel,
           storeLocation,
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
index 6e11fe4..2cd771c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
@@ -139,14 +139,6 @@
     if (null == carbonLoadModel.getLoadMetadataDetails) {
       CommonUtil.readLoadMetadataDetails(carbonLoadModel)
     }
-    if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
-      // Just launch job to merge index and return
-      CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
-        CarbonDataMergerUtil.getValidSegmentList(carbonTable.getAbsoluteTableIdentifier).asScala,
-        carbonLoadModel.getTablePath,
-        carbonTable, true)
-      return
-    }
     // reading the start time of data load.
     val loadStartTime : Long =
       if (alterTableModel.factTimeStamp.isEmpty) {
@@ -192,6 +184,16 @@
         LOGGER.info("Acquired the compaction lock for table" +
                     s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         try {
+          if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
+            // Just launch job to merge index and return
+            CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
+              CarbonDataMergerUtil.getValidSegmentList(
+                carbonTable.getAbsoluteTableIdentifier).asScala,
+              carbonLoadModel.getTablePath,
+              carbonTable, true)
+            lock.unlock()
+            return
+          }
           CarbonDataRDDFactory.startCompactionThreads(sqlContext,
             carbonLoadModel,
             storeLocation,