[CARBONDATA-3594] Optimize getSplits() during compaction

Problem:
In MergerRDD, for compaction of n segments per task, get splits is called n times.
Solution:
In MergerRDD, for per compaction task,get all validSegments and call getsplits only once for those valid segments

This closes #3475
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index febaeca..5e33ea7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -343,7 +343,7 @@
     var noOfBlocks = 0
 
     val taskInfoList = new java.util.ArrayList[Distributable]
-    var carbonInputSplits = mutable.Seq[CarbonInputSplit]()
+    var carbonInputSplits = mutable.ArrayBuffer[CarbonInputSplit]()
     var allSplits = new java.util.ArrayList[InputSplit]
 
     var splitsOfLastSegment: List[CarbonInputSplit] = null
@@ -359,6 +359,8 @@
       loadMetadataDetails = SegmentStatusManager
         .readLoadMetadata(CarbonTablePath.getMetadataPath(tablePath))
     }
+
+    val validSegIds: java.util.List[String] = new util.ArrayList[String]()
     // for each valid segment.
     for (eachSeg <- carbonMergerMapping.validSegments) {
       // In case of range column get the size for calculation of number of ranges
@@ -369,44 +371,48 @@
           }
         }
       }
-
-      // map for keeping the relation of a task and its blocks.
-      job.getConfiguration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg.getSegmentNo)
-
-      if (updateStatusManager.getUpdateStatusDetails.length != 0) {
-         updateDetails = updateStatusManager.getInvalidTimestampRange(eachSeg.getSegmentNo)
-      }
-
-      val updated: Boolean = updateStatusManager.getUpdateStatusDetails.length != 0
-      // get splits
-      val splits = format.getSplits(job)
-
-      // keep on assigning till last one is reached.
-      if (null != splits && splits.size > 0) {
-        splitsOfLastSegment = splits.asScala
-          .map(_.asInstanceOf[CarbonInputSplit])
-          .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava
-      }
-       val filteredSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter{ entry =>
-        val blockInfo = new TableBlockInfo(entry.getFilePath,
-          entry.getStart, entry.getSegmentId,
-          entry.getLocations, entry.getLength, entry.getVersion,
-          updateStatusManager.getDeleteDeltaFilePath(
-            entry.getFilePath,
-            Segment.toSegment(entry.getSegmentId).getSegmentNo)
-        )
-        (!updated || (updated && (!CarbonUtil
-          .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
-            updateDetails, updateStatusManager)))) &&
-        FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
-      }
-      if (rangeColumn != null) {
-        totalTaskCount = totalTaskCount +
-                         CarbonCompactionUtil.getTaskCountForSegment(filteredSplits.toArray)
-      }
-      carbonInputSplits ++:= filteredSplits
-      allSplits.addAll(filteredSplits.asJava)
+      validSegIds.add(eachSeg.getSegmentNo)
     }
+
+    // map for keeping the relation of a task and its blocks.
+    job.getConfiguration
+      .set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, validSegIds.asScala.mkString(","))
+
+    val updated: Boolean = updateStatusManager.getUpdateStatusDetails.length != 0
+    // get splits
+    val splits = format.getSplits(job)
+
+    // keep on assigning till last one is reached.
+    if (null != splits && splits.size > 0) {
+      splitsOfLastSegment = splits.asScala
+        .map(_.asInstanceOf[CarbonInputSplit])
+        .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava
+    }
+    val filteredSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter { entry =>
+      val segmentId = Segment.toSegment(entry.getSegmentId).getSegmentNo
+      val blockInfo = new TableBlockInfo(entry.getFilePath,
+        entry.getStart, entry.getSegmentId,
+        entry.getLocations, entry.getLength, entry.getVersion,
+        updateStatusManager.getDeleteDeltaFilePath(
+          entry.getFilePath,
+          segmentId)
+      )
+      if (updateStatusManager.getUpdateStatusDetails.length != 0) {
+        updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId)
+      }
+      // filter splits with V3 data file format
+      // if split is updated, then check for if it is valid segment based on update details
+      (!updated ||
+       (updated && (!CarbonUtil.isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
+         updateDetails, updateStatusManager)))) &&
+      FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
+    }
+    if (rangeColumn != null) {
+      totalTaskCount = totalTaskCount +
+                       CarbonCompactionUtil.getTaskCountForSegment(filteredSplits.toArray)
+    }
+    carbonInputSplits ++= filteredSplits
+    allSplits.addAll(filteredSplits.asJava)
     totalTaskCount = totalTaskCount / carbonMergerMapping.validSegments.size
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     var allRanges: Array[Object] = new Array[Object](0)