[CARBONDATA-3874] segment mismatch between maintable and SI table when load with concurrency

Why is this PR needed?
1. segment mismatch between main table and SI table with concurrency loads
2. In concurrent loads, if one of the load failed for SI table then 'isSITableEnabled' will be
disabled(isSITableEnabled = false). So in failed SI event listener case we are just checking SI
enabled is true(isSITableEnabled == true) then we are not loading current load to SI table. In
concurrent scenarios, this might be happening as SI enabled state is true but segment difference may exist.

What changes were proposed in this PR?
So instead of checking just SI enabled is true(isSITableEnabled == true) we should also check if any
segment difference between main table and SI table. The final output flag checking will be as follows.
if ( isSITableEnabled == true || mainTblAndSidiff == true ) { --- }

This closes #3811
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
index be0c614..2e979d6 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
@@ -49,6 +49,7 @@
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus}
@@ -390,8 +391,13 @@
       }
       val indexTablePath = CarbonTablePath
         .getMetadataPath(tableInfo.getOrCreateAbsoluteTableIdentifier.getTablePath)
+      val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+        SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+      val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+        SegmentStatusManager.readLoadMetadata(indexTablePath)
       val isMaintableSegEqualToSISegs = CarbonInternalLoaderUtil
-        .checkMainTableSegEqualToSISeg(carbonTable.getMetadataPath, indexTablePath)
+        .checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
+          siTblLoadMetadataDetails)
       if (isMaintableSegEqualToSISegs) {
         // enable the SI table
         sparkSession.sql(
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
index 2071385..74f2f53 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
@@ -79,8 +79,20 @@
                   .getTableMetadata(TableIdentifier(indexTableName,
                     Some(carbonLoadModel.getDatabaseName))).storage.properties
                   .getOrElse("isSITableEnabled", "true").toBoolean
+                val indexTable = metaStore
+                  .lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)(
+                    sparkSession)
+                  .asInstanceOf[CarbonRelation]
+                  .carbonTable
 
-                if (!isLoadSIForFailedSegments) {
+                val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+                  SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+                val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+                  SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+                if (!isLoadSIForFailedSegments
+                    || !CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
+                  mainTblLoadMetadataDetails,
+                  siTblLoadMetadataDetails)) {
                   val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
                     indexTableName)
                   val secondaryIndex = IndexModel(Some(carbonTable.getDatabaseName),
@@ -88,11 +100,6 @@
                     indexColumns.split(",").toList,
                     indexTableName)
 
-                  val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
-                  val indexTable = metaStore
-                    .lookupRelation(Some(carbonLoadModel.getDatabaseName),
-                      indexTableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
-
                   var details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
                   // If it empty, then no need to do further computations because the
                   // tabletstatus might not have been created and hence next load will take care
@@ -162,11 +169,22 @@
                       // get the current load metadata details of the index table
                       details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
                     }
+
+                    // get updated main table segments and si table segments
+                    val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+                      SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+                    val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+                      SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+
+                    // check if main table has load in progress and SI table has no load
+                    // in progress entry, then no need to enable the SI table
                     // Only if the valid segments of maintable match the valid segments of SI
                     // table then we can enable the SI for query
                     if (CarbonInternalLoaderUtil
-                      .checkMainTableSegEqualToSISeg(carbonTable.getMetadataPath,
-                        indexTable.getMetadataPath)) {
+                          .checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
+                            siTblLoadMetadataDetails)
+                        && CarbonInternalLoaderUtil.checkInProgLoadInMainTableAndSI(carbonTable,
+                      mainTblLoadMetadataDetails, siTblLoadMetadataDetails)) {
                       // enable the SI table if it was disabled earlier due to failure during SI
                       // creation time
                       sparkSession.sql(
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
index 6d6ad28..b4066fc 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
@@ -304,14 +304,13 @@
 
   /**
    * Method to check if main table and SI have same number of valid segments or not
-   *
    */
-  public static boolean checkMainTableSegEqualToSISeg(String carbonTablePath,
-      String indexTablePath) {
+  public static boolean checkMainTableSegEqualToSISeg(LoadMetadataDetails[] mainTableLoadMetadataDetails,
+                                                      LoadMetadataDetails[] siTableLoadMetadataDetails) {
     List<String> mainTableSegmentsList =
-        getListOfValidSlices(SegmentStatusManager.readCarbonMetaData(carbonTablePath));
+        getListOfValidSlices(mainTableLoadMetadataDetails);
     List<String> indexList =
-        getListOfValidSlices(SegmentStatusManager.readLoadMetadata(indexTablePath));
+        getListOfValidSlices(siTableLoadMetadataDetails);
     Collections.sort(mainTableSegmentsList);
     Collections.sort(indexList);
     if (indexList.size() != mainTableSegmentsList.size()) {
@@ -325,4 +324,27 @@
     return true;
   }
 
+  /**
+   * Method to check if main table has in progress load and same segment not present in SI
+   */
+  public static boolean checkInProgLoadInMainTableAndSI(CarbonTable carbonTable,
+                                                      LoadMetadataDetails[] mainTableLoadMetadataDetails,
+                                                      LoadMetadataDetails[] siTableLoadMetadataDetails) {
+    List<String> allSiSlices = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    for (LoadMetadataDetails oneLoad : siTableLoadMetadataDetails) {
+      allSiSlices.add(oneLoad.getLoadName());
+    }
+
+    if (mainTableLoadMetadataDetails.length != 0) {
+      for (LoadMetadataDetails loadDetail : mainTableLoadMetadataDetails) {
+        // if load in progress and check if same load is present in SI.
+        if (SegmentStatusManager.isLoadInProgress(carbonTable.getAbsoluteTableIdentifier(), loadDetail.getLoadName())) {
+          if (!allSiSlices.contains(loadDetail.getLoadName())) {
+            return false;
+          }
+        }
+      }
+    }
+    return true;
+  }
 }