[CARBONDATA-3946] Support IndexServer with Presto Engine

Why is this PR needed?
Currently, when indexserver is enabled with presto, query will through NPE

What changes were proposed in this PR?
Use Config from Job to get index server client
Get QueryId from Presto and set it to indexFormat

This closes #3885
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 8864963..a271da6 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2472,4 +2472,14 @@
   public static final String STRING_LENGTH_EXCEEDED_MESSAGE =
       "Record %s of column %s exceeded " + MAX_CHARS_PER_COLUMN_DEFAULT +
           " characters. Please consider long string data type.";
+
+  /**
+   * property which defines the presto query
+   */
+  @CarbonProperty public static final String IS_QUERY_FROM_PRESTO = "is_query_from_presto";
+
+  /**
+   * property which defines the presto query default value
+   */
+  public static final String IS_QUERY_FROM_PRESTO_DEFAULT = "false";
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/index/AbstractIndexJob.java b/core/src/main/java/org/apache/carbondata/core/index/AbstractIndexJob.java
index fbf023a..44f2df6 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/AbstractIndexJob.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/AbstractIndexJob.java
@@ -23,6 +23,7 @@
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
 /**
@@ -36,7 +37,8 @@
   }
 
   @Override
-  public List<ExtendedBlocklet> execute(IndexInputFormat indexInputFormat) {
+  public List<ExtendedBlocklet> execute(IndexInputFormat indexInputFormat,
+      Configuration configuration) {
     return null;
   }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexJob.java b/core/src/main/java/org/apache/carbondata/core/index/IndexJob.java
index 608f989..61cfde0 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexJob.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexJob.java
@@ -24,6 +24,7 @@
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
 /**
@@ -34,8 +35,8 @@
 
   void execute(CarbonTable carbonTable, FileInputFormat<Void, BlockletIndexWrapper> format);
 
-  List<ExtendedBlocklet> execute(IndexInputFormat indexInputFormat);
+  List<ExtendedBlocklet> execute(IndexInputFormat indexInputFormat, Configuration configuration);
 
-  Long executeCountJob(IndexInputFormat indexInputFormat);
+  Long executeCountJob(IndexInputFormat indexInputFormat, Configuration configuration);
 
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java b/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
index 28449c4..56ee810 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexUtil.java
@@ -122,7 +122,7 @@
         new IndexInputFormat(carbonTable, validAndInvalidSegmentsInfo.getValidSegments(),
             invalidSegment, true, indexToClear);
     try {
-      indexJob.execute(indexInputFormat);
+      indexJob.execute(indexInputFormat, null);
     } catch (Exception e) {
       // Consider a scenario where clear index job is called from drop table
       // and index server crashes, in this no exception should be thrown and
@@ -273,9 +273,10 @@
   public static List<ExtendedBlocklet> executeIndexJob(CarbonTable carbonTable,
       FilterResolverIntf resolver, IndexJob indexJob, List<PartitionSpec> partitionsToPrune,
       List<Segment> validSegments, List<Segment> invalidSegments, IndexLevel level,
-      List<String> segmentsToBeRefreshed) {
+      List<String> segmentsToBeRefreshed, Configuration configuration) {
     return executeIndexJob(carbonTable, resolver, indexJob, partitionsToPrune, validSegments,
-        invalidSegments, level, false, segmentsToBeRefreshed, false);
+        invalidSegments, level, false, segmentsToBeRefreshed, false,
+        configuration);
   }
 
   /**
@@ -286,7 +287,8 @@
   public static List<ExtendedBlocklet> executeIndexJob(CarbonTable carbonTable,
       FilterResolverIntf resolver, IndexJob indexJob, List<PartitionSpec> partitionsToPrune,
       List<Segment> validSegments, List<Segment> invalidSegments, IndexLevel level,
-      Boolean isFallbackJob, List<String> segmentsToBeRefreshed, boolean isCountJob) {
+      Boolean isFallbackJob, List<String> segmentsToBeRefreshed, boolean isCountJob,
+      Configuration configuration) {
     List<String> invalidSegmentNo = new ArrayList<>();
     for (Segment segment : invalidSegments) {
       invalidSegmentNo.add(segment.getSegmentNo());
@@ -299,7 +301,7 @@
       indexInputFormat.setCountStarJob();
       indexInputFormat.setIsWriteToFile(false);
     }
-    return indexJob.execute(indexInputFormat);
+    return indexJob.execute(indexInputFormat, configuration);
   }
 
   public static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
diff --git a/docs/prestodb-guide.md b/docs/prestodb-guide.md
index 7b2b2a9..0e45108 100644
--- a/docs/prestodb-guide.md
+++ b/docs/prestodb-guide.md
@@ -301,3 +301,6 @@
 During reading, it supports the non-distributed indexes like block index and bloom index.
 It doesn't support Materialized View as it needs query plan to be changed and presto does not allow it.
 Also, Presto carbon supports streaming segment read from streaming table created by spark.
+
+Presto also supports caching block/blocklet indexes in distributed index server. Refer 
+[Presto Setup with CarbonData Distributed IndexServer](./prestosql-guide.md#presto-setup-with-carbondata-distributed-indexserver)
diff --git a/docs/prestosql-guide.md b/docs/prestosql-guide.md
index 11bb385..ff05379 100644
--- a/docs/prestosql-guide.md
+++ b/docs/prestosql-guide.md
@@ -24,6 +24,8 @@
 
 [Presto Single Node Setup for Carbondata](#presto-single-node-setup-for-carbondata)
 
+[Presto Setup with CarbonData Distributed IndexServer](#presto-setup-with-carbondata-distributed-indexserver)
+
 ## Presto Multinode Cluster Setup for Carbondata
 ### Installing Presto
 
@@ -301,3 +303,21 @@
 During reading, it supports the non-distributed index like block index and bloom index.
 It doesn't support Materialized View as it needs query plan to be changed and presto does not allow it.
 Also, Presto carbon supports streaming segment read from streaming table created by spark.
+
+## Presto Setup with CarbonData Distributed IndexServer
+
+### Dependency jars
+After copying all the jars from ../integration/presto/target/carbondata-presto-X.Y.Z-SNAPSHOT 
+to `plugin/carbondata` directory on all nodes, ensure copying the following jars as well.
+1. Copy ../integration/spark/target/carbondata-spark_X.Y.Z-SNAPSHOT.jar
+2. Copy corresponding Spark dependency jars to the location.
+
+### Configure properties
+Configure IndexServer configurations in carbon.properties file. Refer 
+[Configuring IndexServer](https://github.com/apache/carbondata/blob/master/docs/index-server.md#Configurations) for more info.
+Add  `-Dcarbon.properties.filepath=<path>/carbon.properties` in jvm.config file. 
+
+### Presto with IndexServer
+Start distributed index server. Launch presto CLI and fire SELECT query and check if the corresponding job
+is fired in the index server application.  Users can use spark to view the cache loaded by using
+show metacache command. Refer: [MetaCacheDDL](./ddl-of-carbondata.md#cache)
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 557fbfa..43cbe1f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -391,8 +391,8 @@
    * table. If the job fails for some reason then an embedded job is fired to
    * get the count.
    */
-  Long getDistributedCount(CarbonTable table,
-      List<PartitionSpec> partitionNames, List<Segment> validSegments) {
+  Long getDistributedCount(CarbonTable table, List<PartitionSpec> partitionNames,
+      List<Segment> validSegments, Configuration configuration) {
     IndexInputFormat indexInputFormat =
         new IndexInputFormat(table, null, validSegments, new ArrayList<>(),
             partitionNames, false, null, false, false);
@@ -402,25 +402,26 @@
       if (indexJob == null) {
         throw new ExceptionInInitializerError("Unable to create index job");
       }
-      return indexJob.executeCountJob(indexInputFormat);
+      return indexJob.executeCountJob(indexInputFormat, configuration);
     } catch (Exception e) {
       LOG.error("Failed to get count from index server. Initializing fallback", e);
       IndexJob indexJob = IndexUtil.getEmbeddedJob();
-      return indexJob.executeCountJob(indexInputFormat);
+      return indexJob.executeCountJob(indexInputFormat, configuration);
     }
   }
 
   List<ExtendedBlocklet> getDistributedBlockRowCount(CarbonTable table,
       List<PartitionSpec> partitionNames, List<Segment> validSegments,
-      List<Segment> invalidSegments, List<String> segmentsToBeRefreshed) {
+      List<Segment> invalidSegments, List<String> segmentsToBeRefreshed,
+      Configuration configuration) {
     return getDistributedSplit(table, null, partitionNames, validSegments, invalidSegments,
-        segmentsToBeRefreshed, true);
+        segmentsToBeRefreshed, true, configuration);
   }
 
   private List<ExtendedBlocklet> getDistributedSplit(CarbonTable table,
       FilterResolverIntf filterResolverIntf, List<PartitionSpec> partitionNames,
       List<Segment> validSegments, List<Segment> invalidSegments,
-      List<String> segmentsToBeRefreshed, boolean isCountJob) {
+      List<String> segmentsToBeRefreshed, boolean isCountJob, Configuration configuration) {
     try {
       IndexJob indexJob = (IndexJob) IndexUtil.createIndexJob(IndexUtil.DISTRIBUTED_JOB_NAME);
       if (indexJob == null) {
@@ -428,7 +429,7 @@
       }
       return IndexUtil
           .executeIndexJob(table, filterResolverIntf, indexJob, partitionNames, validSegments,
-              invalidSegments, null, false, segmentsToBeRefreshed, isCountJob);
+              invalidSegments, null, false, segmentsToBeRefreshed, isCountJob, configuration);
     } catch (Exception e) {
       // Check if fallback is disabled for testing purposes then directly throw exception.
       if (CarbonProperties.getInstance().isFallBackDisabled()) {
@@ -436,9 +437,10 @@
       }
       LOG.error("Exception occurred while getting splits using index server. Initiating Fall "
           + "back to embedded mode", e);
-      return IndexUtil.executeIndexJob(table, filterResolverIntf,
-          IndexUtil.getEmbeddedJob(), partitionNames, validSegments,
-          invalidSegments, null, true, segmentsToBeRefreshed, isCountJob);
+      return IndexUtil
+          .executeIndexJob(table, filterResolverIntf, IndexUtil.getEmbeddedJob(), partitionNames,
+              validSegments, invalidSegments, null, true, segmentsToBeRefreshed, isCountJob,
+              configuration);
     }
   }
 
@@ -528,7 +530,7 @@
       try {
         prunedBlocklets =
             getDistributedSplit(carbonTable, filter.getResolver(), partitionsToPrune, segmentIds,
-                invalidSegments, segmentsToBeRefreshed, false);
+                invalidSegments, segmentsToBeRefreshed, false, job.getConfiguration());
       } catch (Exception e) {
         // Check if fallback is disabled then directly throw exception otherwise try driver
         // pruning.
@@ -568,7 +570,8 @@
           if (distributedCG && indexJob != null) {
             cgPrunedBlocklets = IndexUtil
                 .executeIndexJob(carbonTable, filter.getResolver(), indexJob, partitionsToPrune,
-                    segmentIds, invalidSegments, IndexLevel.CG, new ArrayList<>());
+                    segmentIds, invalidSegments, IndexLevel.CG, new ArrayList<>(),
+                    job.getConfiguration());
           } else {
             cgPrunedBlocklets = cgIndexExprWrapper.prune(segmentIds, partitionsToPrune);
           }
@@ -602,9 +605,10 @@
           // Prune segments from already pruned blocklets
           IndexUtil.pruneSegments(segmentIds, prunedBlocklets);
           // Prune segments from already pruned blocklets
-          fgPrunedBlocklets = IndexUtil.executeIndexJob(
-              carbonTable, filter.getResolver(), indexJob, partitionsToPrune, segmentIds,
-              invalidSegments, fgIndexExprWrapper.getIndexLevel(), new ArrayList<>());
+          fgPrunedBlocklets = IndexUtil
+              .executeIndexJob(carbonTable, filter.getResolver(), indexJob, partitionsToPrune,
+                  segmentIds, invalidSegments, fgIndexExprWrapper.getIndexLevel(),
+                  new ArrayList<>(), job.getConfiguration());
           // note that the 'fgPrunedBlocklets' has extra index related info compared with
           // 'prunedBlocklets', so the intersection should keep the elements in 'fgPrunedBlocklets'
           prunedBlocklets =
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 2dd52a4..e3aacc0 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -482,7 +482,7 @@
         try {
           List<ExtendedBlocklet> extendedBlocklets =
               getDistributedBlockRowCount(table, partitions, filteredSegment,
-                  allSegments.getInvalidSegments(), toBeCleanedSegments);
+                  allSegments.getInvalidSegments(), toBeCleanedSegments, job.getConfiguration());
           for (ExtendedBlocklet blocklet : extendedBlocklets) {
             String filePath = blocklet.getFilePath().replace("\\", "/");
             String blockName = filePath.substring(filePath.lastIndexOf("/") + 1);
@@ -538,7 +538,7 @@
       if (CarbonProperties.getInstance()
           .isDistributedPruningEnabled(table.getDatabaseName(), table.getTableName())) {
         totalRowCount =
-            getDistributedCount(table, partitions, filteredSegment);
+            getDistributedCount(table, partitions, filteredSegment, job.getConfiguration());
       } else {
         TableIndex defaultIndex = IndexStoreManager.getInstance().getDefaultIndex(table);
         totalRowCount = defaultIndex.getRowCount(filteredSegment, partitions, defaultIndex);
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
index dc50b8e..815775c 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
@@ -153,7 +153,9 @@
         ColumnarFormatVersion.valueOf(carbonLocalInputSplit.getVersion()),
         carbonLocalInputSplit.getDeleteDeltaFiles());
     inputSplit.setFormat(carbonLocalInputSplit.getFileFormat());
-    if (FileFormat.COLUMNAR_V3.ordinal() == inputSplit.getFileFormat().ordinal()) {
+    if (FileFormat.COLUMNAR_V3.ordinal() == inputSplit.getFileFormat().ordinal()
+        && null != carbonLocalInputSplit.detailInfo && !carbonLocalInputSplit.detailInfo
+        .equalsIgnoreCase("null")) {
       Gson gson = new Gson();
       BlockletDetailInfo blockletDetailInfo =
           gson.fromJson(carbonLocalInputSplit.detailInfo, BlockletDetailInfo.class);
diff --git a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataModule.java b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataModule.java
index 81b2476..56b52b0 100755
--- a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataModule.java
+++ b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataModule.java
@@ -167,6 +167,10 @@
     binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
 
     configBinder(binder).bindConfig(ParquetFileWriterConfig.class);
+
+    // configure carbon properties
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.IS_QUERY_FROM_PRESTO, "true");
   }
 
 }
diff --git a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataSplitManager.java
index 5bcadd9..fbb387f 100755
--- a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataSplitManager.java
+++ b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -101,6 +101,7 @@
     HiveTableLayoutHandle layout = (HiveTableLayoutHandle) layoutHandle;
     SchemaTableName schemaTableName = layout.getSchemaTableName();
 
+    carbonTableReader.setPrestoQueryId(session.getQueryId());
     // get table metadata
     SemiTransactionalHiveMetastore metastore =
         metastoreProvider.apply((HiveTransactionHandle) transactionHandle);
diff --git a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/impl/CarbonTableReader.java
index a571bef..d4d4e88 100755
--- a/integration/presto/src/main/prestodb/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/prestodb/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -103,6 +103,11 @@
   private String queryId;
 
   /**
+   * presto cli query id
+   */
+  private String prestoQueryId;
+
+  /**
    * Logger instance
    */
   private static final Logger LOGGER =
@@ -256,6 +261,7 @@
     List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
     CarbonTable carbonTable = tableCacheModel.getCarbonTable();
     TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set("presto.cli.query.id", prestoQueryId);
     config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
     String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
     config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
@@ -397,4 +403,9 @@
   public void setQueryId(String queryId) {
     this.queryId = queryId;
   }
+
+  public void setPrestoQueryId(String prestoQueryId) {
+    this.prestoQueryId = prestoQueryId;
+  }
+
 }
diff --git a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataModule.java b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataModule.java
index b8c3c0b..b7128da 100755
--- a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataModule.java
+++ b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataModule.java
@@ -21,6 +21,8 @@
 
 import static java.util.Objects.requireNonNull;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.presto.impl.CarbonTableReader;
 
 import com.google.inject.Binder;
@@ -162,6 +164,10 @@
     binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
 
     configBinder(binder).bindConfig(ParquetFileWriterConfig.class);
+
+    // configure carbon properties
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.IS_QUERY_FROM_PRESTO, "true");
   }
 
 }
diff --git a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataSplitManager.java
index fe1dd9a..76c3fd0 100755
--- a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataSplitManager.java
+++ b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -110,6 +110,7 @@
     HiveTableHandle hiveTable = (HiveTableHandle) tableHandle;
     SchemaTableName schemaTableName = hiveTable.getSchemaTableName();
 
+    carbonTableReader.setPrestoQueryId(session.getQueryId());
     // get table metadata
     SemiTransactionalHiveMetastore metastore =
         metastoreProvider.apply((HiveTransactionHandle) transactionHandle);
diff --git a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 7364942..de59a9f 100755
--- a/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -101,6 +101,11 @@
   private String queryId;
 
   /**
+   * presto cli query id
+   */
+  private String prestoQueryId;
+
+  /**
    * Logger instance
    */
   private static final Logger LOGGER =
@@ -254,6 +259,7 @@
     List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
     CarbonTable carbonTable = tableCacheModel.getCarbonTable();
     TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
+    config.set("presto.cli.query.id", prestoQueryId);
     config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
     String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
     config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
@@ -395,4 +401,9 @@
   public void setQueryId(String queryId) {
     this.queryId = queryId;
   }
+
+  public void setPrestoQueryId(String prestoQueryId) {
+    this.prestoQueryId = prestoQueryId;
+  }
+
 }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala
index 6957a60..a81c202 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexJobs.scala
@@ -20,11 +20,13 @@
 
 import scala.collection.JavaConverters._
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.log4j.Logger
 import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.util.SizeEstimator
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.index.{AbstractIndexJob, IndexInputFormat}
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -32,7 +34,7 @@
 import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.scan.filter.resolver.{FilterResolverIntf, LogicalFilterResolverImpl, RowLevelFilterResolverImpl}
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.spark.util.CarbonScalaUtil.logTime
 
 /**
@@ -43,7 +45,8 @@
 
   val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
-  override def execute(indexFormat: IndexInputFormat): util.List[ExtendedBlocklet] = {
+  override def execute(indexFormat: IndexInputFormat,
+      configuration: Configuration): util.List[ExtendedBlocklet] = {
     if (LOGGER.isDebugEnabled) {
       val messageSize = SizeEstimator.estimate(indexFormat)
       LOGGER.debug(s"Size of message sent to Index Server: $messageSize")
@@ -54,15 +57,32 @@
       .info("Temp folder path for Query ID: " + indexFormat.getQueryId + " is " + splitFolderPath)
     val (response, time) = logTime {
       try {
-        val spark = SparkSQLUtil.getSparkSession
-        indexFormat.setTaskGroupId(SparkSQLUtil.getTaskGroupId(spark))
-        indexFormat.setTaskGroupDesc(SparkSQLUtil.getTaskGroupDesc(spark))
+        val isQueryFromPresto = CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.IS_QUERY_FROM_PRESTO,
+            CarbonCommonConstants.IS_QUERY_FROM_PRESTO_DEFAULT)
+          .toBoolean
+        // In case of presto with index server flow, sparkSession will be null
+        if (!isQueryFromPresto) {
+          val spark = SparkSQLUtil.getSparkSession
+          indexFormat.setTaskGroupId(SparkSQLUtil.getTaskGroupId(spark))
+          indexFormat.setTaskGroupDesc(SparkSQLUtil.getTaskGroupDesc(spark))
+        } else {
+          val queryId = configuration.get("presto.cli.query.id")
+          if (null != queryId) {
+            indexFormat.setTaskGroupId(queryId)
+          }
+        }
         var filterInf = indexFormat.getFilterResolverIntf
         val filterProcessor = new FilterExpressionProcessor
         filterInf = removeSparkUnknown(filterInf,
           indexFormat.getCarbonTable.getAbsoluteTableIdentifier, filterProcessor)
         indexFormat.setFilterResolverIntf(filterInf)
-        IndexServer.getClient.getSplits(indexFormat)
+        val client = if (isQueryFromPresto) {
+          IndexServer.getClient(configuration)
+        } else {
+          IndexServer.getClient
+        }
+        client.getSplits(indexFormat)
           .getExtendedBlocklets(indexFormat.getCarbonTable.getTablePath, indexFormat
             .getQueryId, indexFormat.isCountStarJob)
       } finally {
@@ -110,8 +130,18 @@
     filterInf
   }
 
-  override def executeCountJob(indexFormat: IndexInputFormat): java.lang.Long = {
-    IndexServer.getClient.getCount(indexFormat).get()
+  override def executeCountJob(indexFormat: IndexInputFormat,
+      configuration: Configuration): java.lang.Long = {
+    val isQueryFromPresto = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.IS_QUERY_FROM_PRESTO,
+        CarbonCommonConstants.IS_QUERY_FROM_PRESTO_DEFAULT)
+      .toBoolean
+    val client = if (isQueryFromPresto) {
+      IndexServer.getClient(configuration)
+    } else {
+      IndexServer.getClient
+    }
+    client.getCount(indexFormat).get()
   }
 }
 
@@ -121,7 +151,8 @@
  */
 class EmbeddedIndexJob extends AbstractIndexJob {
 
-  override def execute(indexFormat: IndexInputFormat): util.List[ExtendedBlocklet] = {
+  override def execute(indexFormat: IndexInputFormat,
+      configuration: Configuration): util.List[ExtendedBlocklet] = {
     val spark = SparkSQLUtil.getSparkSession
     val originalJobDesc = spark.sparkContext.getLocalProperty("spark.job.description")
     indexFormat.setIsWriteToFile(false)
@@ -137,7 +168,8 @@
     splits
   }
 
-  override def executeCountJob(inputFormat: IndexInputFormat): java.lang.Long = {
+  override def executeCountJob(inputFormat: IndexInputFormat,
+      configuration: Configuration): java.lang.Long = {
     inputFormat.setFallbackJob()
     IndexServer.getCount(inputFormat).get()
   }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 9798e80..0d32007 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -305,6 +305,14 @@
   def getClient: ServerInterface = {
     val sparkSession = SparkSQLUtil.getSparkSession
     val configuration = SparkSQLUtil.sessionState(sparkSession).newHadoopConf()
+    getClient(configuration)
+  }
+
+  /**
+   * @return Return a new Client to communicate with the Index Server.
+   */
+  def getClient(configuration: Configuration): ServerInterface = {
+
     import org.apache.hadoop.ipc.RPC
     RPC.getProtocolProxy(classOf[ServerInterface],
       RPC.getProtocolVersion(classOf[ServerInterface]),
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/jobs/SparkBlockletIndexLoaderJob.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/jobs/SparkBlockletIndexLoaderJob.scala
index 7cb1750..b50b3d9 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/jobs/SparkBlockletIndexLoaderJob.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/jobs/SparkBlockletIndexLoaderJob.scala
@@ -90,7 +90,10 @@
     }
   }
 
-  override def executeCountJob(indexFormat: IndexInputFormat): lang.Long = 0L
+  override def executeCountJob(indexFormat: IndexInputFormat,
+      configuration: Configuration): lang.Long = {
+    0L
+  }
 }
 
 class IndexCacher(
diff --git a/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala b/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
index 8f4da6a..b36f7ea 100644
--- a/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
+++ b/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
@@ -20,9 +20,13 @@
 import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
+
 import mockit.{Mock, MockUp}
+import org.apache.hadoop.conf.Configuration
+
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.scalatest.{BeforeAndAfterEach, FunSuite}
+
 import org.apache.carbondata.core.index.{IndexInputFormat, Segment}
 import org.apache.carbondata.core.index.dev.expr.IndexInputSplitWrapper
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexInputSplit
@@ -144,7 +148,7 @@
       }
     }
     try{
-      distributedRDDUtilsTest.execute(mockDataMapFormat.getMockInstance)
+      distributedRDDUtilsTest.execute(mockDataMapFormat.getMockInstance, new Configuration())
     } catch {
       case ex: Exception =>
     }
@@ -176,7 +180,7 @@
       }
     }
     try{
-      distributedRDDUtilsTest.execute(mockDataMapFormat.getMockInstance)
+      distributedRDDUtilsTest.execute(mockDataMapFormat.getMockInstance, new Configuration())
     } catch {
       case ex: Exception =>
     }