[CARBONDATA-3834] Fix the partition table issues while 'carbon.merge.index.in.segment' is false

Why is this PR needed?
Segment directory and the segment file in metadata are not created for partitioned table when 'carbon.merge.index.in.segment' property is set to false. And actual index files which were present in respective partition's '.tmp' directory are also deleted without moving them out to respective partition directory where its '.carbondata' file exist.
All the queries fail due to this problem as there is no segment and index files.

This issue was introduced from the resolution of an older optimization PR #3535

What changes were proposed in this PR?
If 'carbon.merge.index.in.segment' property is false, we can create the segment directory and segment file, and move the index file from respective partition's temp directory to partition directory where the .carbondata file exists.
when carbon.timeseries.first.day.of.week property is configured with invalid value, null pointer exception is thrown instead of using default value.

Does this PR introduce any user interface change?
No

Is any new testcase added?
Yes

This closes #3776
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 2f274c3..545d5b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -28,11 +28,13 @@
 import java.io.Serializable;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -105,6 +107,56 @@
   }
 
   /**
+   * Method to create and write the segment file, removes the temporary directories from all the
+   * respective partition directories. This method is invoked only when {@link
+   * CarbonCommonConstants#CARBON_MERGE_INDEX_IN_SEGMENT} is disabled.
+   * @param tablePath Table path
+   * @param segmentId Segment id
+   * @param timeStamp FactTimeStamp
+   * @param partitionNames Partition names list
+   * @param indexFileNames Index files map with partition as key and index file names set as value
+   * @throws IOException
+   */
+  public static void writeSegmentFile(String tablePath, String segmentId, String timeStamp,
+      List<String> partitionNames, Map<String, Set<String>> indexFileNames) throws IOException {
+    SegmentFileStore.SegmentFile finalSegmentFile = null;
+    boolean isRelativePath;
+    String partitionLoc;
+    for (String partition : partitionNames) {
+      isRelativePath = false;
+      partitionLoc = partition;
+      if (partitionLoc.startsWith(tablePath)) {
+        partitionLoc = partitionLoc.substring(tablePath.length());
+        isRelativePath = true;
+      }
+      SegmentFileStore.SegmentFile segmentFile = new SegmentFileStore.SegmentFile();
+      SegmentFileStore.FolderDetails folderDetails = new SegmentFileStore.FolderDetails();
+      folderDetails.setFiles(indexFileNames.get(partition));
+      folderDetails.setPartitions(
+          Collections.singletonList(partitionLoc.substring(partitionLoc.indexOf("/") + 1)));
+      folderDetails.setRelative(isRelativePath);
+      folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+      segmentFile.getLocationMap().put(partitionLoc, folderDetails);
+      if (finalSegmentFile != null) {
+        finalSegmentFile = finalSegmentFile.merge(segmentFile);
+      } else {
+        finalSegmentFile = segmentFile;
+      }
+    }
+    Objects.requireNonNull(finalSegmentFile);
+    String segmentFilesLocation = CarbonTablePath.getSegmentFilesLocation(tablePath);
+    CarbonFile locationFile = FileFactory.getCarbonFile(segmentFilesLocation);
+    if (!locationFile.exists()) {
+      locationFile.mkdirs();
+    }
+    String segmentFileName = SegmentFileStore.genSegmentFileName(segmentId, timeStamp);
+    SegmentFileStore.writeSegmentFile(finalSegmentFile,
+        segmentFilesLocation + "/" + segmentFileName + CarbonTablePath.SEGMENT_EXT);
+    SegmentFileStore
+        .moveFromTempFolder(finalSegmentFile, segmentId + "_" + timeStamp + ".tmp", tablePath);
+  }
+
+  /**
    * Write segment information to the segment folder with indexfilename and
    * corresponding partitions.
    */
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
index df03bcc..36c0260 100644
--- a/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
@@ -176,9 +176,9 @@
     } catch (IllegalArgumentException ex) {
       LOGGER.warn("Invalid value set for first of the week. Considering the default value as: "
           + CarbonCommonConstants.CARBON_TIMESERIES_FIRST_DAY_OF_WEEK_DEFAULT);
-      firstDayOfWeek = DaysOfWeekEnum.valueOf(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.CARBON_TIMESERIES_FIRST_DAY_OF_WEEK_DEFAULT)
-          .toUpperCase()).getOrdinal();
+      firstDayOfWeek =
+          DaysOfWeekEnum.valueOf(CarbonCommonConstants.CARBON_TIMESERIES_FIRST_DAY_OF_WEEK_DEFAULT)
+              .getOrdinal();
     }
     calanderThreadLocal.get().setFirstDayOfWeek(firstDayOfWeek);
   }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 4b8fc43..6aa3067 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -40,6 +41,7 @@
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonSessionInfo;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.OperationContext;
@@ -254,7 +256,8 @@
   private void commitJobForPartition(JobContext context, boolean overwriteSet,
       CarbonLoadModel loadModel, String partitionPath) throws IOException {
     String size = context.getConfiguration().get("carbon.datasize", "");
-    if (size.equalsIgnoreCase("0")) {
+    String indexSize = context.getConfiguration().get("carbon.indexsize", "");
+    if (size.equalsIgnoreCase("0") || indexSize.equalsIgnoreCase("0")) {
       CarbonLoaderUtil.updateTableStatusForFailure(loadModel);
       return;
     }
@@ -270,6 +273,18 @@
       uuid = operationContext.getProperty("uuid").toString();
     }
     String tempFolderPath = loadModel.getSegmentId() + "_" + loadModel.getFactTimeStamp() + ".tmp";
+    boolean isMergeIndex = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+            CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT));
+    if (!isMergeIndex) {
+      Map<String, Set<String>> indexFileNameMap = (Map<String, Set<String>>) ObjectSerializationUtil
+          .convertStringToObject(context.getConfiguration().get("carbon.index.files.name"));
+      List<String> partitionList =
+          (List<String>) ObjectSerializationUtil.convertStringToObject(partitionPath);
+      SegmentFileStore.writeSegmentFile(loadModel.getTablePath(), loadModel.getSegmentId(),
+          String.valueOf(loadModel.getFactTimeStamp()), partitionList, indexFileNameMap);
+      tempFolderPath = null;
+    }
     if (operationContext != null) {
       operationContext.setProperty("partitionPath", partitionPath);
       operationContext.setProperty("tempPath", tempFolderPath);
@@ -288,7 +303,11 @@
     String segmentFileName = SegmentFileStore.genSegmentFileName(
         loadModel.getSegmentId(), String.valueOf(loadModel.getFactTimeStamp()));
     newMetaEntry.setSegmentFile(segmentFileName + CarbonTablePath.SEGMENT_EXT);
-    newMetaEntry.setIndexSize("" + loadModel.getMetrics().getMergeIndexSize());
+    if (isMergeIndex) {
+      newMetaEntry.setIndexSize("" + loadModel.getMetrics().getMergeIndexSize());
+    } else if (!StringUtils.isEmpty(indexSize)) {
+      newMetaEntry.setIndexSize(indexSize);
+    }
     if (!StringUtils.isEmpty(size)) {
       newMetaEntry.setDataSize(size);
     }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index 225daf5..d9c2633 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -254,6 +254,8 @@
       taskCommits: Seq[TaskCommitMessage]): Unit = {
     if (isCarbonDataFlow(jobContext.getConfiguration)) {
       var dataSize = 0L
+      var indexLen = 0L
+      val indexFileNameMap = new util.HashMap[String, util.Set[String]]()
       val partitions =
         taskCommits
           .flatMap { taskCommit =>
@@ -264,6 +266,24 @@
                 if (size.isDefined) {
                   dataSize = dataSize + java.lang.Long.parseLong(size.get)
                 }
+                val indexSize = map.get("carbon.indexsize")
+                if (indexSize.isDefined) {
+                  indexLen = indexLen + java.lang.Long.parseLong(indexSize.get)
+                }
+                val indexFiles = map.get("carbon.index.files.name")
+                if (indexFiles.isDefined) {
+                  val indexMap = ObjectSerializationUtil
+                    .convertStringToObject(indexFiles.get)
+                    .asInstanceOf[util.HashMap[String, Set[String]]]
+                  indexMap.asScala.foreach { e =>
+                    var values: util.Set[String] = indexFileNameMap.get(e._1)
+                    if (values == null) {
+                      values = new util.HashSet[String]()
+                      indexFileNameMap.put(e._1, values)
+                    }
+                    values.addAll(e._2.asInstanceOf[util.Set[String]])
+                  }
+                }
                 if (partition.isDefined) {
                   ObjectSerializationUtil
                     .convertStringToObject(partition.get)
@@ -283,13 +303,18 @@
         "carbon.output.partitions.name",
         ObjectSerializationUtil.convertObjectToString(partitions))
       jobContext.getConfiguration.set("carbon.datasize", dataSize.toString)
-
+      jobContext.getConfiguration.set("carbon.indexsize", indexLen.toString)
+      jobContext.getConfiguration
+        .set("carbon.index.files.name",
+          ObjectSerializationUtil.convertObjectToString(indexFileNameMap))
       val newTaskCommits = taskCommits.map { taskCommit =>
         taskCommit.obj match {
           case (map: Map[String, String], set) =>
             new TaskCommitMessage(
-              map
-                .filterNot(e => "carbon.partitions".equals(e._1) || "carbon.datasize".equals(e._1)),
+              map.filterNot(e => "carbon.partitions".equals(e._1) ||
+                                 "carbon.datasize".equals(e._1) ||
+                                 "carbon.indexsize".equals(e._1) ||
+                                 "carbon.index.files.name".equals(e._1)),
               set)
           case _ => taskCommit
         }
@@ -313,27 +338,46 @@
       ThreadLocalSessionInfo.unsetAll()
       val partitions: String = taskContext.getConfiguration.get("carbon.output.partitions.name", "")
       val files = taskContext.getConfiguration.get("carbon.output.files.name", "")
+      val indexFileNameMap = new util.HashMap[String, util.Set[String]]()
       var sum = 0L
       var indexSize = 0L
-      if (!StringUtils.isEmpty(files)) {
-        val filesList = ObjectSerializationUtil
-          .convertStringToObject(files)
+      if (!StringUtils.isEmpty(partitions)) {
+        val partitionList = ObjectSerializationUtil
+          .convertStringToObject(partitions)
           .asInstanceOf[util.ArrayList[String]]
           .asScala
-        for (file <- filesList) {
-          if (file.contains(".carbondata")) {
-            sum += java.lang.Long.parseLong(file.substring(file.lastIndexOf(":") + 1))
-          } else if (file.contains(".carbonindex")) {
-            indexSize += java.lang.Long.parseLong(file.substring(file.lastIndexOf(":") + 1))
+        if (!StringUtils.isEmpty(files)) {
+          val filesList = ObjectSerializationUtil
+            .convertStringToObject(files)
+            .asInstanceOf[util.ArrayList[String]]
+            .asScala
+          for (file <- filesList) {
+            if (file.contains(".carbondata")) {
+              sum += java.lang.Long.parseLong(file.substring(file.lastIndexOf(":") + 1))
+            } else if (file.contains(".carbonindex")) {
+              val fileOffset = file.lastIndexOf(":")
+              indexSize += java.lang.Long.parseLong(file.substring(fileOffset + 1))
+              val absoluteFileName = file.substring(0, fileOffset)
+              val indexFileNameOffset = absoluteFileName.lastIndexOf("/")
+              val indexFileName = absoluteFileName.substring(indexFileNameOffset + 1)
+              val matchedPartition = partitionList.find(absoluteFileName.startsWith)
+              var values: util.Set[String] = indexFileNameMap.get(matchedPartition.get)
+              if (values == null) {
+                values = new util.HashSet[String]()
+                indexFileNameMap.put(matchedPartition.get, values)
+              }
+              values.add(indexFileName)
+            }
           }
         }
-      }
-      if (!StringUtils.isEmpty(partitions)) {
+        val indexFileNames = ObjectSerializationUtil.convertObjectToString(indexFileNameMap)
         taskMsg = taskMsg.obj match {
           case (map: Map[String, String], set) =>
             new TaskCommitMessage(
-              map ++ Map("carbon.partitions" -> partitions, "carbon.datasize" -> sum.toString),
-              set)
+              map ++ Map("carbon.partitions" -> partitions,
+                "carbon.datasize" -> sum.toString,
+                "carbon.indexsize" -> indexSize.toString,
+                "carbon.index.files.name" -> indexFileNames), set)
           case _ => taskMsg
         }
       }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index b5580c6..1619dfc 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -147,6 +147,30 @@
     )
   }
 
+  test("test delete for partition table without merge index files for segment") {
+    try {
+      sql("DROP TABLE IF EXISTS iud_db.partition_nomerge_index")
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+      sql(
+        s"""CREATE TABLE iud_db.partition_nomerge_index (a INT, b INT) PARTITIONED BY (country
+           |STRING) STORED AS carbondata"""
+          .stripMargin)
+      sql("INSERT INTO iud_db.partition_nomerge_index  PARTITION(country='India') SELECT 1,2")
+      sql("INSERT INTO iud_db.partition_nomerge_index  PARTITION(country='India') SELECT 3,4")
+      sql("INSERT INTO iud_db.partition_nomerge_index  PARTITION(country='China') SELECT 5,6")
+      sql("INSERT INTO iud_db.partition_nomerge_index  PARTITION(country='China') SELECT 7,8")
+      checkAnswer(sql("select * from iud_db.partition_nomerge_index"),
+        Seq(Row(1, 2, "India"), Row(3, 4, "India"), Row(5, 6, "China"), Row(7, 8, "China")))
+      sql("DELETE FROM iud_db.partition_nomerge_index WHERE b = 4")
+      checkAnswer(sql("select * from iud_db.partition_nomerge_index"),
+        Seq(Row(1, 2, "India"), Row(5, 6, "China"), Row(7, 8, "China")))
+    } finally {
+      CarbonProperties.getInstance()
+        .removeProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT)
+    }
+  }
+
   test("Records more than one pagesize after delete operation ") {
     sql("DROP TABLE IF EXISTS carbon2")
     import sqlContext.implicits._
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 5fc15d8..2f455af 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -891,6 +891,30 @@
     sql("drop table if exists test_return_row_count_source")
   }
 
+  test("test update for partition table without merge index files for segment") {
+    try {
+      sql("DROP TABLE IF EXISTS iud.partition_nomerge_index")
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+      sql(
+        s"""CREATE TABLE iud.partition_nomerge_index (a INT, b INT) PARTITIONED BY (country
+           |STRING) STORED AS carbondata"""
+          .stripMargin)
+      sql("INSERT INTO iud.partition_nomerge_index  PARTITION(country='India') SELECT 1,2")
+      sql("INSERT INTO iud.partition_nomerge_index  PARTITION(country='India') SELECT 3,4")
+      sql("INSERT INTO iud.partition_nomerge_index  PARTITION(country='China') SELECT 5,6")
+      sql("INSERT INTO iud.partition_nomerge_index  PARTITION(country='China') SELECT 7,8")
+      checkAnswer(sql("select * from iud.partition_nomerge_index"),
+        Seq(Row(1, 2, "India"), Row(3, 4, "India"), Row(5, 6, "China"), Row(7, 8, "China")))
+      sql("UPDATE iud.partition_nomerge_index SET (b)=(1)")
+      checkAnswer(sql("select * from iud.partition_nomerge_index"),
+        Seq(Row(1, 1, "India"), Row(3, 1, "India"), Row(5, 1, "China"), Row(7, 1, "China")))
+    } finally {
+      CarbonProperties.getInstance()
+        .removeProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT)
+    }
+  }
+
   override def afterAll {
     sql("use default")
     sql("drop database  if exists iud cascade")
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index e23d375..5e2acaa 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -566,6 +566,25 @@
     assert(ex.getMessage().equalsIgnoreCase("Cannot use all columns for partition columns;"))
   }
 
+  test("test partition without merge index files for segment") {
+    try {
+      sql("DROP TABLE IF EXISTS new_par")
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+      sql(
+        s"""CREATE TABLE new_par (a INT, b INT) PARTITIONED BY (country STRING) STORED AS
+           |carbondata""".stripMargin)
+      sql("INSERT INTO new_par PARTITION(country='India') SELECT 1,2")
+      sql("INSERT INTO new_par PARTITION(country='India') SELECT 3,4")
+      sql("INSERT INTO new_par PARTITION(country='China') SELECT 5,6")
+      sql("INSERT INTO new_par PARTITION(country='China') SELECT 7,8")
+      checkAnswer(sql("SELECT COUNT(*) FROM new_par"), Seq(Row(4)))
+    } finally {
+      CarbonProperties.getInstance()
+        .removeProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT)
+    }
+  }
+
   def verifyInsertForPartitionTable(tableName: String, sort_scope: String): Unit = {
     sql(s"drop table if exists $tableName")
     sql(