[CARBONDATA-3594] Optimize getSplits() during compaction
Problem:
In MergerRDD, for compaction of n segments per task, get splits is called n times.
Solution:
In MergerRDD, for per compaction task,get all validSegments and call getsplits only once for those valid segments
This closes #3475
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index febaeca..5e33ea7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -343,7 +343,7 @@
var noOfBlocks = 0
val taskInfoList = new java.util.ArrayList[Distributable]
- var carbonInputSplits = mutable.Seq[CarbonInputSplit]()
+ var carbonInputSplits = mutable.ArrayBuffer[CarbonInputSplit]()
var allSplits = new java.util.ArrayList[InputSplit]
var splitsOfLastSegment: List[CarbonInputSplit] = null
@@ -359,6 +359,8 @@
loadMetadataDetails = SegmentStatusManager
.readLoadMetadata(CarbonTablePath.getMetadataPath(tablePath))
}
+
+ val validSegIds: java.util.List[String] = new util.ArrayList[String]()
// for each valid segment.
for (eachSeg <- carbonMergerMapping.validSegments) {
// In case of range column get the size for calculation of number of ranges
@@ -369,44 +371,48 @@
}
}
}
-
- // map for keeping the relation of a task and its blocks.
- job.getConfiguration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg.getSegmentNo)
-
- if (updateStatusManager.getUpdateStatusDetails.length != 0) {
- updateDetails = updateStatusManager.getInvalidTimestampRange(eachSeg.getSegmentNo)
- }
-
- val updated: Boolean = updateStatusManager.getUpdateStatusDetails.length != 0
- // get splits
- val splits = format.getSplits(job)
-
- // keep on assigning till last one is reached.
- if (null != splits && splits.size > 0) {
- splitsOfLastSegment = splits.asScala
- .map(_.asInstanceOf[CarbonInputSplit])
- .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava
- }
- val filteredSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter{ entry =>
- val blockInfo = new TableBlockInfo(entry.getFilePath,
- entry.getStart, entry.getSegmentId,
- entry.getLocations, entry.getLength, entry.getVersion,
- updateStatusManager.getDeleteDeltaFilePath(
- entry.getFilePath,
- Segment.toSegment(entry.getSegmentId).getSegmentNo)
- )
- (!updated || (updated && (!CarbonUtil
- .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
- updateDetails, updateStatusManager)))) &&
- FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
- }
- if (rangeColumn != null) {
- totalTaskCount = totalTaskCount +
- CarbonCompactionUtil.getTaskCountForSegment(filteredSplits.toArray)
- }
- carbonInputSplits ++:= filteredSplits
- allSplits.addAll(filteredSplits.asJava)
+ validSegIds.add(eachSeg.getSegmentNo)
}
+
+ // map for keeping the relation of a task and its blocks.
+ job.getConfiguration
+ .set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, validSegIds.asScala.mkString(","))
+
+ val updated: Boolean = updateStatusManager.getUpdateStatusDetails.length != 0
+ // get splits
+ val splits = format.getSplits(job)
+
+ // keep on assigning till last one is reached.
+ if (null != splits && splits.size > 0) {
+ splitsOfLastSegment = splits.asScala
+ .map(_.asInstanceOf[CarbonInputSplit])
+ .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava
+ }
+ val filteredSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter { entry =>
+ val segmentId = Segment.toSegment(entry.getSegmentId).getSegmentNo
+ val blockInfo = new TableBlockInfo(entry.getFilePath,
+ entry.getStart, entry.getSegmentId,
+ entry.getLocations, entry.getLength, entry.getVersion,
+ updateStatusManager.getDeleteDeltaFilePath(
+ entry.getFilePath,
+ segmentId)
+ )
+ if (updateStatusManager.getUpdateStatusDetails.length != 0) {
+ updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId)
+ }
+ // filter splits with V3 data file format
+ // if split is updated, then check for if it is valid segment based on update details
+ (!updated ||
+ (updated && (!CarbonUtil.isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
+ updateDetails, updateStatusManager)))) &&
+ FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
+ }
+ if (rangeColumn != null) {
+ totalTaskCount = totalTaskCount +
+ CarbonCompactionUtil.getTaskCountForSegment(filteredSplits.toArray)
+ }
+ carbonInputSplits ++= filteredSplits
+ allSplits.addAll(filteredSplits.asJava)
totalTaskCount = totalTaskCount / carbonMergerMapping.validSegments.size
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
var allRanges: Array[Object] = new Array[Object](0)