[CARBONDATA-3592] Fix query on bloom in case of multiple data files in one segment

Problem:
1. Query on bloom datamap fails when there are multiple data files in one segment.
2. Query on bloom is giving wrong results in case of multiple carbondata files.

Solution:
1. Old pruned index files were cleared from the FilteredIndexSharedNames list. So further
pruning was not done on all the valid index files. Hence added a check to clear the index
files only in valid scenarios. Also handled the case where wrong blocklet id is passed while
creating the blocklet from relative blocklet id.
2. Make the partitions based on block path so that all the CarbonInputSplits in a MultiBlockSplit
are used for bloom reading. This means 1 task for 1 shard(unique block path).

This closes #3474
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index ca56962..0db5901 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -20,8 +20,10 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -152,18 +154,26 @@
    * Prune the segments from the already pruned blocklets.
    */
   public static void pruneSegments(List<Segment> segments, List<ExtendedBlocklet> prunedBlocklets) {
-    Set<Segment> validSegments = new HashSet<>();
+    Map<Segment, Set<String>> validSegments = new HashMap<>();
     for (ExtendedBlocklet blocklet : prunedBlocklets) {
-      // Clear the old pruned index files if any present
-      blocklet.getSegment().getFilteredIndexShardNames().clear();
       // Set the pruned index file to the segment
       // for further pruning.
       String shardName = CarbonTablePath.getShardName(blocklet.getFilePath());
-      blocklet.getSegment().setFilteredIndexShardName(shardName);
-      validSegments.add(blocklet.getSegment());
+      // Add the existing shards to corresponding segments
+      Set<String> existingShards = validSegments.get(blocklet.getSegment());
+      if (existingShards == null) {
+        existingShards = new HashSet<>();
+        validSegments.put(blocklet.getSegment(), existingShards);
+      }
+      existingShards.add(shardName);
+    }
+    // override the shards list in the segments.
+    for (Map.Entry<Segment, Set<String>> entry : validSegments.entrySet()) {
+      entry.getKey().setFilteredIndexShardNames(entry.getValue());
     }
     segments.clear();
-    segments.addAll(validSegments);
+    // add the new segments to the segments list.
+    segments.addAll(validSegments.keySet());
   }
 
   static List<ExtendedBlocklet> pruneDataMaps(CarbonTable table,
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 532fd74..6384ee9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -274,8 +274,8 @@
     return filteredIndexShardNames;
   }
 
-  public void setFilteredIndexShardName(String filteredIndexShardName) {
-    this.filteredIndexShardNames.add(filteredIndexShardName);
+  public void setFilteredIndexShardNames(Set<String> filteredIndexShardNames) {
+    this.filteredIndexShardNames = filteredIndexShardNames;
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index be29e63..30f9943 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -916,6 +916,9 @@
         if (diff < 0) {
           relativeBlockletId = (short) (diff + blockletCount);
           break;
+        } else if (diff == 0) {
+          relativeBlockletId++;
+          break;
         }
         rowIndex++;
       }
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index f42cc8f..8079fa0 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -489,11 +489,13 @@
       job.getConfiguration,
       tableInfo.getFactTable.getTableName)
 
+    // make the partitions based on block path so that all the CarbonInputSplits in a
+    // MultiBlockSplit are used for bloom reading. This means 1 task for 1 shard(unique block path).
     format
       .getSplits(job)
       .asScala
       .map(_.asInstanceOf[CarbonInputSplit])
-      .groupBy(p => (p.getSegmentId, p.taskId))
+      .groupBy(p => (p.getSegmentId, p.taskId, p.getBlockPath))
       .map { group =>
         new CarbonMultiBlockSplit(
           group._2.asJava,