[CARBONDATA-3646] [CARBONDATA-3647]: Fix query failure with Index Server

Problems:
1. Select * query fails when using index server.
2. Filter query failure with Index server when loaded with global_sort_partition = 100000,
giving Null pointer exception

Solution:
1. Indexservertmp folder configuration has been changed. The Indexservertmp
folder is now created outside the table, as it is not recommended to keep it inside the table.
2. table.getAbsoluteTableIdentifier() was giving NULL pointer exception
when table was NULL, this case has been handled.

This closes #3537
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
index 1ca9697..4d47565 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
@@ -24,6 +24,7 @@
 import java.util.Set;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
@@ -221,9 +222,11 @@
    */
   private FilterResolverIntf resolveFilter() {
     try {
+      AbsoluteTableIdentifier absoluteTableIdentifier =
+              table != null ? table.getAbsoluteTableIdentifier() : null;
       FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
       return filterExpressionProcessor
-          .getFilterResolver(expression, table.getAbsoluteTableIdentifier());
+          .getFilterResolver(expression, absoluteTableIdentifier);
     } catch (Exception e) {
       throw new RuntimeException("Error while resolving filter expression", e);
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 50c7d6b..7c3ce5f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -718,7 +718,7 @@
         UpdateVO updateVO = statusManager.getInvalidTimestampRange(updateDetails.getSegmentName());
         SegmentRefreshInfo segmentRefreshInfo;
         if (updateVO != null && updateVO.getLatestUpdateTimestamp() != null) {
-          segmentRefreshInfo = new SegmentRefreshInfo(updateVO.getCreatedOrUpdatedTimeStamp(), 0);
+          segmentRefreshInfo = new SegmentRefreshInfo(updateVO.getLatestUpdateTimestamp(), 0);
         } else {
           segmentRefreshInfo = new SegmentRefreshInfo(0L, 0);
         }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
index ad4f804..8b51834 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlockletWrapper.java
@@ -78,7 +78,8 @@
     // executor to driver, in case of any failure data will send through network
     if (bytes.length > serializeAllowedSize && isWriteToFile) {
       final String fileName = UUID.randomUUID().toString();
-      String folderPath = CarbonUtil.getIndexServerTempPath(tablePath, queryId);
+      String folderPath = CarbonUtil.getIndexServerTempPath()
+              + CarbonCommonConstants.FILE_SEPARATOR + queryId;
       try {
         final CarbonFile carbonFile = FileFactory.getCarbonFile(folderPath);
         boolean isFolderExists = true;
@@ -178,7 +179,8 @@
       if (isWrittenToFile) {
         DataInputStream stream = null;
         try {
-          final String folderPath = CarbonUtil.getIndexServerTempPath(tablePath, queryId);
+          final String folderPath = CarbonUtil.getIndexServerTempPath()
+                  + CarbonCommonConstants.FILE_SEPARATOR + queryId;
           String fileName = new String(bytes, CarbonCommonConstants.DEFAULT_CHARSET);
           stream = FileFactory
               .getDataInputStream(folderPath + "/" + fileName);
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index cde9d8a..63a3c04 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -94,7 +94,7 @@
       throws IOException {
     SegmentRefreshInfo segmentRefreshInfo;
     if (updateVo != null) {
-      segmentRefreshInfo = new SegmentRefreshInfo(updateVo.getCreatedOrUpdatedTimeStamp(), 0);
+      segmentRefreshInfo = new SegmentRefreshInfo(updateVo.getLatestUpdateTimestamp(), 0);
     } else {
       segmentRefreshInfo = new SegmentRefreshInfo(0L, 0);
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index babbafa..79daeaa 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -3320,29 +3320,42 @@
     return UUID.randomUUID().toString();
   }
 
-  public static String getIndexServerTempPath(String tablePath, String queryId) {
+  public static String getIndexServerTempPath() {
     String tempFolderPath = CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_TEMP_PATH);
     if (null == tempFolderPath) {
       tempFolderPath =
-          tablePath + "/" + CarbonCommonConstants.INDEX_SERVER_TEMP_FOLDER_NAME + "/" + queryId;
+          "/tmp/" + CarbonCommonConstants.INDEX_SERVER_TEMP_FOLDER_NAME;
     } else {
       tempFolderPath =
-          tempFolderPath + "/" + CarbonCommonConstants.INDEX_SERVER_TEMP_FOLDER_NAME + "/"
-              + queryId;
+          tempFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+              + CarbonCommonConstants.INDEX_SERVER_TEMP_FOLDER_NAME;
     }
-    return tempFolderPath;
+    return CarbonUtil.checkAndAppendFileSystemURIScheme(tempFolderPath);
   }
 
-  public static CarbonFile createTempFolderForIndexServer(String tablePath, String queryId)
-      throws IOException {
-    final String path = getIndexServerTempPath(tablePath, queryId);
-    CarbonFile file = FileFactory.getCarbonFile(path);
+  public static CarbonFile createTempFolderForIndexServer(String queryId)
+          throws IOException {
+    final String path = getIndexServerTempPath();
+    if (queryId == null) {
+      if (!FileFactory.isFileExist(path)) {
+        // Create the new index server temp directory if it does not exist
+        LOGGER.info("Creating Index Server temp folder:" + path);
+        FileFactory
+                .createDirectoryAndSetPermission(path,
+                        new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+      }
+      return null;
+    }
+    CarbonFile file = FileFactory.getCarbonFile(path + CarbonCommonConstants.FILE_SEPARATOR
+            + queryId);
     if (!file.mkdirs()) {
-      LOGGER.info("Unable to create table directory for index server");
+      LOGGER.info("Unable to create table directory: " + path + CarbonCommonConstants.FILE_SEPARATOR
+              + queryId);
       return null;
     } else {
-      LOGGER.info("Created index server temp directory" + path);
+      LOGGER.info("Successfully Created directory: " + path + CarbonCommonConstants.FILE_SEPARATOR
+              + queryId);
       return file;
     }
   }
diff --git a/docs/index-server.md b/docs/index-server.md
index 7eba589..f80f67d 100644
--- a/docs/index-server.md
+++ b/docs/index-server.md
@@ -117,7 +117,7 @@
 be written to file.
 
 The user can set the location for these files by using 'carbon.indexserver.temp.path'. By default
-table path would be used to write the files.
+the files are written in the path /tmp/indexservertmp.
 
 ## Prepriming
 As each query is responsible for caching the pruned datamaps, thus a lot of execution time is wasted in reading the 
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
index 3474795..6d8a467 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
@@ -49,8 +49,7 @@
       LOGGER.debug(s"Size of message sent to Index Server: $messageSize")
     }
     val splitFolderPath = CarbonUtil
-      .createTempFolderForIndexServer(dataMapFormat.getCarbonTable.getTablePath,
-        dataMapFormat.getQueryId)
+      .createTempFolderForIndexServer(dataMapFormat.getQueryId)
     LOGGER
       .info("Temp folder path for Query ID: " + dataMapFormat.getQueryId + " is " + splitFolderPath)
     val (resonse, time) = logTime {
@@ -67,11 +66,9 @@
           .getExtendedBlockets(dataMapFormat.getCarbonTable.getTablePath, dataMapFormat
             .getQueryId, dataMapFormat.isCountStarJob)
       } finally {
-        val tmpPath = CarbonUtil
-          .getIndexServerTempPath(dataMapFormat.getCarbonTable.getTablePath,
-            dataMapFormat.getQueryId)
         if (null != splitFolderPath && !splitFolderPath.deleteFile()) {
-          LOGGER.error("Problem while deleting the temp directory:" + tmpPath)
+          LOGGER.error("Problem while deleting the temp directory:"
+            + splitFolderPath.getAbsolutePath)
         }
       }
     }
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 2bd6c11..855cb87 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -241,6 +241,8 @@
       })
       CarbonProperties.getInstance().addProperty(CarbonCommonConstants
         .CARBON_ENABLE_INDEX_SERVER, "true")
+      CarbonProperties.getInstance().addNonSerializableProperty(CarbonCommonConstants
+        .IS_DRIVER_INSTANCE, "true")
       LOGGER.info(s"Index cache server running on ${ server.getPort } port")
     }
   }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 6afee71..6cbb73e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -26,7 +26,6 @@
 import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.events.{MergeBloomIndexEventListener, MergeIndexEventListener}
-import org.apache.spark.sql.execution.command.cache._
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.listeners.{AlterDataMaptableCompactionPostListener, DataMapAddColumnsPreListener, DataMapAlterTableDropPartitionMetaListener, DataMapAlterTableDropPartitionPreStatusListener, DataMapChangeDataTypeorRenameColumnPreListener, DataMapDeleteSegmentPreListener, DataMapDropColumnPreListener, DropCacheBloomEventListener, DropCacheDataMapEventListener, LoadMVTablePreListener, LoadPostDataMapListener, PrePrimingEventListener, ShowCacheDataMapEventListener, ShowCachePreMVEventListener}
@@ -71,6 +70,8 @@
       properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
     }
     LOGGER.info(s"Initializing CarbonEnv, store location: $storePath")
+    // Creating the index server temp folder where splits for select query is written
+    CarbonUtil.createTempFolderForIndexServer(null);
 
     sparkSession.udf.register("getTupleId", () => "")
     // added for handling MV table creation. when user will fire create ddl for
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
index 7b8e10f..490e03b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
@@ -49,18 +49,19 @@
     OperationListenerBus.getInstance.fireEvent(dropCacheEvent, operationContext)
 
     val cache = CacheProvider.getInstance().getCarbonCache
+    // Clea cache from IndexServer
+    if (CarbonProperties.getInstance().isDistributedPruningEnabled(carbonTable.getDatabaseName,
+      carbonTable.getTableName)) {
+      LOGGER.info("Clearing cache from IndexServer")
+      DataMapUtil.executeClearDataMapJob(carbonTable, DataMapUtil.DISTRIBUTED_JOB_NAME)
+    }
     if (cache != null) {
-      // Get all Index files for the specified table.
-      if (CarbonProperties.getInstance().isDistributedPruningEnabled(carbonTable.getDatabaseName,
-        carbonTable.getTableName)) {
-        DataMapUtil.executeClearDataMapJob(carbonTable, DataMapUtil.DISTRIBUTED_JOB_NAME)
-      } else {
-        // Extract dictionary keys for the table and create cache keys from those
-        val dictKeys: List[String] = CacheUtil.getAllDictCacheKeys(carbonTable)
-        // Remove elements from cache
-        cache.removeAll(dictKeys.asJava)
-        DataMapStoreManager.getInstance().clearDataMaps(carbonTable.getAbsoluteTableIdentifier)
-      }
+      LOGGER.info("Clearing cache from driver side")
+      // Create cache keys from the extracted dictionary keys for the table.
+      val dictKeys: List[String] = CacheUtil.getAllDictCacheKeys(carbonTable)
+      // Remove elements from cache
+      cache.removeAll(dictKeys.asJava)
+      DataMapStoreManager.getInstance().clearDataMaps(carbonTable.getAbsoluteTableIdentifier)
     }
     LOGGER.info("Drop cache request served for table " + carbonTable.getTableUniqueName)
   }