PHOENIX-6694 Avoid unnecessary calls of fetching table meta data to region servers holding the system tables in batch oriented jobs in spark or hive otherwise those RS become hotspot

Co-authored-by: Rajeshbabu Chintaguntla <rajeshbabu@apache.org>
diff --git a/phoenix-spark-base/pom.xml b/phoenix-spark-base/pom.xml
index f9d2761..353841c 100644
--- a/phoenix-spark-base/pom.xml
+++ b/phoenix-spark-base/pom.xml
@@ -439,6 +439,12 @@
       <artifactId>slf4j-log4j12</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>2.5.0</version>
+      <scope>provided</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
index 5b3df46..003a34d 100644
--- a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
+++ b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
@@ -27,9 +27,10 @@
     private final String scn;
     private final String selectStatement;
     private final Properties overriddenProps;
+    private final byte[] pTableCacheBytes;
 
     PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId,
-            String selectStatement, Properties overriddenProps) {
+            String selectStatement, Properties overriddenProps, byte[] pTableCacheBytes) {
         if(overriddenProps == null){
             throw new NullPointerException();
         }
@@ -38,6 +39,7 @@
         this.tenantId = tenantId;
         this.selectStatement = selectStatement;
         this.overriddenProps = overriddenProps;
+        this.pTableCacheBytes = pTableCacheBytes;
     }
 
     String getSelectStatement() {
@@ -59,4 +61,8 @@
     Properties getOverriddenProps() {
         return overriddenProps;
     }
+
+    byte[] getPTableCacheBytes() {
+        return pTableCacheBytes;
+    }
 }
diff --git a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
index 79d8ba2..3ee821b 100644
--- a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
+++ b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
@@ -23,12 +23,15 @@
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compat.CompatUtil;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.generated.PTableProtos.PTable;
 import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.mapreduce.PhoenixInputSplit;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.spark.FilterExpressionCompiler;
 import org.apache.phoenix.spark.SparkSchemaUtil;
 import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
@@ -181,10 +184,12 @@
 
                 // Get the region size
                 long regionSize = CompatUtil.getSize(regionLocator, connection.getAdmin(), location);
-
+                byte[] pTableCacheBytes = PTableImpl.toProto(queryPlan.getTableRef().getTable()).
+                    toByteArray();
                 PhoenixDataSourceReadOptions phoenixDataSourceOptions =
-                        new PhoenixDataSourceReadOptions(zkUrl, currentScnValue.orElse(null),
-                                tenantId.orElse(null), selectStatement, overriddenProps);
+                     new PhoenixDataSourceReadOptions(zkUrl, currentScnValue.orElse(null),
+                            tenantId.orElse(null), selectStatement, overriddenProps,
+                         pTableCacheBytes);
                 if (splitByStats) {
                     for (Scan aScan : scans) {
                         partitions.add(getInputPartition(phoenixDataSourceOptions,
diff --git a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
index ec12718..841545c 100644
--- a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
+++ b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.spark.datasource.v2.reader;
 
+import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -33,6 +34,8 @@
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.generated.PTableProtos;
+import org.apache.phoenix.coprocessor.generated.PTableProtos.PTable;
 import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LookAheadResultIterator;
 import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
@@ -41,12 +44,14 @@
 import org.apache.phoenix.iterate.RoundRobinResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.mapreduce.PhoenixInputSplit;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.spark.SerializableWritable;
 import org.apache.spark.executor.InputMetrics;
@@ -94,6 +99,15 @@
         }
         try (Connection conn = DriverManager.getConnection(
                 JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overridingProps)) {
+            PTable pTable = null;
+            try {
+                pTable = PTable.parseFrom(options.getPTableCacheBytes());
+            } catch (InvalidProtocolBufferException e) {
+                throw new RuntimeException("Parsing the PTable Cache Bytes is failing ", e);
+            }
+            org.apache.phoenix.schema.PTable table = PTableImpl.createFromProto(pTable);
+            PhoenixConnection phoenixConnection = conn.unwrap(PhoenixConnection.class);
+            phoenixConnection.addTable(table, System.currentTimeMillis());
             final Statement statement = conn.createStatement();
             final String selectStatement = options.getSelectStatement();
             if (selectStatement == null){
diff --git a/phoenix5-spark3/pom.xml b/phoenix5-spark3/pom.xml
index c53c525..13ff51a 100644
--- a/phoenix5-spark3/pom.xml
+++ b/phoenix5-spark3/pom.xml
@@ -119,6 +119,13 @@
       <scope>provided</scope>
     </dependency>
 
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>2.5.0</version>
+      <scope>provided</scope>
+    </dependency>
+
     <!-- Misc dependencies -->
     <dependency>
       <groupId>joda-time</groupId>
@@ -178,4 +185,4 @@
         </plugin>
     </plugins>
   </build>
-</project>
\ No newline at end of file
+</project>
diff --git a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixDataSourceReadOptions.java b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixDataSourceReadOptions.java
index 4f7408f..4d49150 100644
--- a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixDataSourceReadOptions.java
+++ b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixDataSourceReadOptions.java
@@ -29,9 +29,11 @@
     private final String scn;
     private final String selectStatement;
     private final Properties overriddenProps;
+    private final byte[] pTableCacheBytes;
 
     PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId,
-                                 String selectStatement, Properties overriddenProps) {
+                                 String selectStatement, Properties overriddenProps,
+        byte[] pTableCacheBytes) {
         if(overriddenProps == null){
             throw new NullPointerException();
         }
@@ -40,6 +42,7 @@
         this.tenantId = tenantId;
         this.selectStatement = selectStatement;
         this.overriddenProps = overriddenProps;
+        this.pTableCacheBytes = pTableCacheBytes;
     }
 
     String getSelectStatement() {
@@ -69,4 +72,8 @@
         }
         return overriddenProps;
     }
+
+    byte[] getPTableCacheBytes() {
+        return pTableCacheBytes;
+    }
 }
diff --git a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixPartitionReader.java b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixPartitionReader.java
index 62c88b5..4cbc237 100644
--- a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixPartitionReader.java
+++ b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixPartitionReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.spark.sql.connector.reader;
 
+import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -33,6 +34,7 @@
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.generated.PTableProtos.PTable;
 import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LookAheadResultIterator;
 import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
@@ -41,11 +43,13 @@
 import org.apache.phoenix.iterate.RoundRobinResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.schema.PTableImpl;
 import org.apache.spark.executor.InputMetrics;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.read.PartitionReader;
@@ -79,8 +83,18 @@
     private QueryPlan getQueryPlan() throws SQLException {
         String zkUrl = options.getZkUrl();
         Properties overridingProps = getOverriddenPropsFromOptions();
+        overridingProps.put("phoenix.skip.system.tables.existence.check", Boolean.valueOf("true"));
         try (Connection conn = DriverManager.getConnection(
                 JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overridingProps)) {
+            PTable pTable = null;
+            try {
+                pTable = PTable.parseFrom(options.getPTableCacheBytes());
+            } catch (InvalidProtocolBufferException e) {
+                throw new RuntimeException("Parsing the PTable Cache Bytes is failing ", e);
+            }
+            org.apache.phoenix.schema.PTable table = PTableImpl.createFromProto(pTable);
+            PhoenixConnection phoenixConnection = conn.unwrap(PhoenixConnection.class);
+            phoenixConnection.addTable(table, System.currentTimeMillis());
             final Statement statement = conn.createStatement();
             final String selectStatement = options.getSelectStatement();
             if (selectStatement == null){
diff --git a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixScan.java b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixScan.java
index ecdf7f6..451d7b1 100644
--- a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixScan.java
+++ b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixScan.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.spark.sql.connector.reader;
 
+import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
@@ -146,10 +147,11 @@
 
                 // Get the region size
                 long regionSize = CompatUtil.getSize(regionLocator, connection.getAdmin(), location);
-
+                byte[] pTableCacheBytes = PTableImpl.toProto(queryPlan.getTableRef().getTable()).
+                    toByteArray();
                 phoenixDataSourceOptions =
                         new PhoenixDataSourceReadOptions(zkUrl, currentScnValue,
-                                tenantId, selectStatement, overriddenProps);
+                                tenantId, selectStatement, overriddenProps, pTableCacheBytes);
 
                 if (splitByStats) {
                     for (org.apache.hadoop.hbase.client.Scan aScan : scans) {