PHOENIX-6694 Avoid unnecessary calls of fetching table meta data to region servers holding the system tables in batch oriented jobs in spark or hive otherwise those RS become hotspot
Co-authored-by: Rajeshbabu Chintaguntla <rajeshbabu@apache.org>
diff --git a/phoenix-spark-base/pom.xml b/phoenix-spark-base/pom.xml
index f9d2761..353841c 100644
--- a/phoenix-spark-base/pom.xml
+++ b/phoenix-spark-base/pom.xml
@@ -439,6 +439,12 @@
<artifactId>slf4j-log4j12</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>2.5.0</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
index 5b3df46..003a34d 100644
--- a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
+++ b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
@@ -27,9 +27,10 @@
private final String scn;
private final String selectStatement;
private final Properties overriddenProps;
+ private final byte[] pTableCacheBytes;
PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId,
- String selectStatement, Properties overriddenProps) {
+ String selectStatement, Properties overriddenProps, byte[] pTableCacheBytes) {
if(overriddenProps == null){
throw new NullPointerException();
}
@@ -38,6 +39,7 @@
this.tenantId = tenantId;
this.selectStatement = selectStatement;
this.overriddenProps = overriddenProps;
+ this.pTableCacheBytes = pTableCacheBytes;
}
String getSelectStatement() {
@@ -59,4 +61,8 @@
Properties getOverriddenProps() {
return overriddenProps;
}
+
+ byte[] getPTableCacheBytes() {
+ return pTableCacheBytes;
+ }
}
diff --git a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
index 79d8ba2..3ee821b 100644
--- a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
+++ b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
@@ -23,12 +23,15 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compat.CompatUtil;
import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.generated.PTableProtos.PTable;
import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.mapreduce.PhoenixInputSplit;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.spark.FilterExpressionCompiler;
import org.apache.phoenix.spark.SparkSchemaUtil;
import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
@@ -181,10 +184,12 @@
// Get the region size
long regionSize = CompatUtil.getSize(regionLocator, connection.getAdmin(), location);
-
+ byte[] pTableCacheBytes = PTableImpl.toProto(queryPlan.getTableRef().getTable()).
+ toByteArray();
PhoenixDataSourceReadOptions phoenixDataSourceOptions =
- new PhoenixDataSourceReadOptions(zkUrl, currentScnValue.orElse(null),
- tenantId.orElse(null), selectStatement, overriddenProps);
+ new PhoenixDataSourceReadOptions(zkUrl, currentScnValue.orElse(null),
+ tenantId.orElse(null), selectStatement, overriddenProps,
+ pTableCacheBytes);
if (splitByStats) {
for (Scan aScan : scans) {
partitions.add(getInputPartition(phoenixDataSourceOptions,
diff --git a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
index ec12718..841545c 100644
--- a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
+++ b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.spark.datasource.v2.reader;
+import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -33,6 +34,8 @@
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.generated.PTableProtos;
+import org.apache.phoenix.coprocessor.generated.PTableProtos.PTable;
import org.apache.phoenix.iterate.ConcatResultIterator;
import org.apache.phoenix.iterate.LookAheadResultIterator;
import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
@@ -41,12 +44,14 @@
import org.apache.phoenix.iterate.RoundRobinResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.mapreduce.PhoenixInputSplit;
import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.spark.SerializableWritable;
import org.apache.spark.executor.InputMetrics;
@@ -94,6 +99,15 @@
}
try (Connection conn = DriverManager.getConnection(
JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overridingProps)) {
+ PTable pTable = null;
+ try {
+ pTable = PTable.parseFrom(options.getPTableCacheBytes());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException("Parsing the PTable Cache Bytes is failing ", e);
+ }
+ org.apache.phoenix.schema.PTable table = PTableImpl.createFromProto(pTable);
+ PhoenixConnection phoenixConnection = conn.unwrap(PhoenixConnection.class);
+ phoenixConnection.addTable(table, System.currentTimeMillis());
final Statement statement = conn.createStatement();
final String selectStatement = options.getSelectStatement();
if (selectStatement == null){
diff --git a/phoenix5-spark3/pom.xml b/phoenix5-spark3/pom.xml
index c53c525..13ff51a 100644
--- a/phoenix5-spark3/pom.xml
+++ b/phoenix5-spark3/pom.xml
@@ -119,6 +119,13 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>2.5.0</version>
+ <scope>provided</scope>
+ </dependency>
+
<!-- Misc dependencies -->
<dependency>
<groupId>joda-time</groupId>
@@ -178,4 +185,4 @@
</plugin>
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>
diff --git a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixDataSourceReadOptions.java b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixDataSourceReadOptions.java
index 4f7408f..4d49150 100644
--- a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixDataSourceReadOptions.java
+++ b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixDataSourceReadOptions.java
@@ -29,9 +29,11 @@
private final String scn;
private final String selectStatement;
private final Properties overriddenProps;
+ private final byte[] pTableCacheBytes;
PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId,
- String selectStatement, Properties overriddenProps) {
+ String selectStatement, Properties overriddenProps,
+ byte[] pTableCacheBytes) {
if(overriddenProps == null){
throw new NullPointerException();
}
@@ -40,6 +42,7 @@
this.tenantId = tenantId;
this.selectStatement = selectStatement;
this.overriddenProps = overriddenProps;
+ this.pTableCacheBytes = pTableCacheBytes;
}
String getSelectStatement() {
@@ -69,4 +72,8 @@
}
return overriddenProps;
}
+
+ byte[] getPTableCacheBytes() {
+ return pTableCacheBytes;
+ }
}
diff --git a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixPartitionReader.java b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixPartitionReader.java
index 62c88b5..4cbc237 100644
--- a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixPartitionReader.java
+++ b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixPartitionReader.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.spark.sql.connector.reader;
+import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -33,6 +34,7 @@
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.generated.PTableProtos.PTable;
import org.apache.phoenix.iterate.ConcatResultIterator;
import org.apache.phoenix.iterate.LookAheadResultIterator;
import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
@@ -41,11 +43,13 @@
import org.apache.phoenix.iterate.RoundRobinResultIterator;
import org.apache.phoenix.iterate.SequenceResultIterator;
import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.schema.PTableImpl;
import org.apache.spark.executor.InputMetrics;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.PartitionReader;
@@ -79,8 +83,18 @@
private QueryPlan getQueryPlan() throws SQLException {
String zkUrl = options.getZkUrl();
Properties overridingProps = getOverriddenPropsFromOptions();
+ overridingProps.put("phoenix.skip.system.tables.existence.check", Boolean.valueOf("true"));
try (Connection conn = DriverManager.getConnection(
JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overridingProps)) {
+ PTable pTable = null;
+ try {
+ pTable = PTable.parseFrom(options.getPTableCacheBytes());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException("Parsing the PTable Cache Bytes is failing ", e);
+ }
+ org.apache.phoenix.schema.PTable table = PTableImpl.createFromProto(pTable);
+ PhoenixConnection phoenixConnection = conn.unwrap(PhoenixConnection.class);
+ phoenixConnection.addTable(table, System.currentTimeMillis());
final Statement statement = conn.createStatement();
final String selectStatement = options.getSelectStatement();
if (selectStatement == null){
diff --git a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixScan.java b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixScan.java
index ecdf7f6..451d7b1 100644
--- a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixScan.java
+++ b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixScan.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.spark.sql.connector.reader;
+import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
@@ -146,10 +147,11 @@
// Get the region size
long regionSize = CompatUtil.getSize(regionLocator, connection.getAdmin(), location);
-
+ byte[] pTableCacheBytes = PTableImpl.toProto(queryPlan.getTableRef().getTable()).
+ toByteArray();
phoenixDataSourceOptions =
new PhoenixDataSourceReadOptions(zkUrl, currentScnValue,
- tenantId, selectStatement, overriddenProps);
+ tenantId, selectStatement, overriddenProps, pTableCacheBytes);
if (splitByStats) {
for (org.apache.hadoop.hbase.client.Scan aScan : scans) {