PHOENIX-5059 Use the Datasource v2 api in the spark connector
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/BaseSaltedTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/BaseSaltedTableIT.java
index 3051cd6..ef127ac 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/BaseSaltedTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/BaseSaltedTableIT.java
@@ -194,7 +194,7 @@
.setSelectColumns(
Lists.newArrayList("A_INTEGER", "A_STRING", "A_ID", "B_STRING", "B_INTEGER"))
.setFullTableName(tableName)
- .setWhereClause("a_integer = 1 AND a_string >= 'ab' AND a_string < 'de' AND a_id = '123'");
+ .setWhereClause("A_INTEGER = 1 AND A_STRING >= 'ab' AND A_STRING < 'de' AND A_ID = '123'");
rs = executeQuery(conn, queryBuilder);
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
@@ -205,7 +205,7 @@
assertFalse(rs.next());
// all single slots with one value.
- queryBuilder.setWhereClause("a_integer = 1 AND a_string = 'ab' AND a_id = '123'");
+ queryBuilder.setWhereClause("A_INTEGER = 1 AND A_STRING = 'ab' AND A_ID = '123'");
rs = executeQuery(conn, queryBuilder);
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
@@ -216,7 +216,7 @@
assertFalse(rs.next());
// all single slots with multiple values.
- queryBuilder.setWhereClause("a_integer in (2, 4) AND a_string = 'abc' AND a_id = '123'");
+ queryBuilder.setWhereClause("A_INTEGER in (2, 4) AND A_STRING = 'abc' AND A_ID = '123'");
rs = executeQuery(conn, queryBuilder);
assertTrue(rs.next());
diff --git a/phoenix-spark/pom.xml b/phoenix-spark/pom.xml
index e2790bd..9cc3c3d 100644
--- a/phoenix-spark/pom.xml
+++ b/phoenix-spark/pom.xml
@@ -487,6 +487,14 @@
<testSourceDirectory>src/it/scala</testSourceDirectory>
<testResources><testResource><directory>src/it/resources</directory></testResource></testResources>
<plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
diff --git a/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java b/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java
index 83578ba..1257c43 100644
--- a/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java
+++ b/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java
@@ -16,12 +16,13 @@
import java.util.Properties;
import org.apache.phoenix.end2end.BaseOrderByIT;
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryBuilder;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
-import org.junit.Ignore;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.junit.Test;
import com.google.common.collect.Lists;
@@ -31,28 +32,6 @@
public class OrderByIT extends BaseOrderByIT {
- @Ignore(" || operator not supported in order by Spark 1.6 ")
- @Test
- @Override
- public void testDescMultiOrderByExpr() throws Exception {
- super.testDescMultiOrderByExpr();
- }
-
- @Ignore("NULLS FIRST|LAST not supported in Spark 1.6")
- @Test
- @Override
- public void testNullsLastWithDesc() throws Exception {
- super.testNullsLastWithDesc();
- }
-
- @Ignore("NULLS FIRST|LAST not supported in Spark 1.6")
- @Test
- @Override
- public void testOrderByReverseOptimizationWithNullsLast() throws Exception {
- super.testOrderByReverseOptimizationWithNullsLast();
- }
-
-
@Override
protected ResultSet executeQueryThrowsException(Connection conn, QueryBuilder queryBuilder,
String expectedPhoenixExceptionMsg, String expectedSparkExceptionMsg) {
@@ -128,25 +107,20 @@
// create two PhoenixRDDs using the table names and columns that are required for the JOIN query
List<String> table1Columns = Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D");
- SQLContext sqlContext = SparkUtil.getSqlContext();
- DataFrame phoenixDataSet =
- new PhoenixRDD(SparkUtil.getSparkContext(), tableName1,
- JavaConverters.asScalaBufferConverter(table1Columns).asScala().toSeq(),
- Option.apply((String) null), Option.apply(getUrl()), config, false,
- null).toDataFrame(sqlContext);
- phoenixDataSet.registerTempTable(tableName1);
- List<String> table2Columns = Lists.newArrayList("A_STRING", "COL1");
- phoenixDataSet =
- new PhoenixRDD(SparkUtil.getSparkContext(), tableName2,
- JavaConverters.asScalaBufferConverter(table2Columns).asScala().toSeq(),
- Option.apply((String) null), Option.apply(getUrl()), config, false,
- null).toDataFrame(sqlContext);
- phoenixDataSet.registerTempTable(tableName2);
+ SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
+ Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+ .option(DataSourceOptions.TABLE_KEY, tableName1)
+ .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+ phoenixDataSet.createOrReplaceTempView(tableName1);
+ phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+ .option(DataSourceOptions.TABLE_KEY, tableName2)
+ .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+ phoenixDataSet.createOrReplaceTempView(tableName2);
String query =
"SELECT T1.* FROM " + tableName1 + " T1 JOIN " + tableName2
+ " T2 ON T1.A_STRING = T2.A_STRING ORDER BY T1.`CF1.B`";
- DataFrame dataset =
+ Dataset<Row> dataset =
sqlContext.sql(query);
List<Row> rows = dataset.collectAsList();
ResultSet rs = new SparkResultSet(rows, dataset.columns());
@@ -175,8 +149,8 @@
assertFalse(rs.next());
query =
- "select t1.a_string, t2.col1 from " + tableName1 + " t1 join " + tableName2
- + " t2 on t1.a_string = t2.a_string order by t2.col1";
+ "SELECT T1.A_STRING, T2.COL1 FROM " + tableName1 + " T1 JOIN " + tableName2
+ + " T2 ON T1.A_STRING = T2.A_STRING ORDER BY T2.COL1";
dataset = sqlContext.sql(query);
rows = dataset.collectAsList();
rs = new SparkResultSet(rows, dataset.columns());
@@ -193,7 +167,6 @@
}
}
- @Ignore("Not passing on CDH 4.15")
@Test
public void testOrderByWithUnionAll() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -249,26 +222,20 @@
conn.commit();
- List<String> table1Columns = Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D");
- SQLContext sqlContext = SparkUtil.getSqlContext();
- DataFrame phoenixDataSet =
- new PhoenixRDD(SparkUtil.getSparkContext(), tableName1,
- JavaConverters.asScalaBufferConverter(table1Columns).asScala().toSeq(),
- Option.apply((String) null), Option.apply(getUrl()), config, false,
- null).toDataFrame(sqlContext);
- phoenixDataSet.registerTempTable(tableName1);
- List<String> table2Columns = Lists.newArrayList("A_STRING", "COL1");
- phoenixDataSet =
- new PhoenixRDD(SparkUtil.getSparkContext(), tableName2,
- JavaConverters.asScalaBufferConverter(table2Columns).asScala().toSeq(),
- Option.apply((String) null), Option.apply(getUrl()), config, false,
- null).toDataFrame(sqlContext);
- phoenixDataSet.registerTempTable(tableName2);
+ SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
+ Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+ .option(DataSourceOptions.TABLE_KEY, tableName1)
+ .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+ phoenixDataSet.createOrReplaceTempView(tableName1);
+ phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+ .option(DataSourceOptions.TABLE_KEY, tableName2)
+ .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+ phoenixDataSet.createOrReplaceTempView(tableName2);
String query =
"select a_string, `cf2.d` from " + tableName1 + " union all select * from "
+ tableName2 + " order by `cf2.d`";
- DataFrame dataset =
+ Dataset<Row> dataset =
sqlContext.sql(query);
List<Row> rows = dataset.collectAsList();
ResultSet rs = new SparkResultSet(rows, dataset.columns());
@@ -330,15 +297,12 @@
stmt.execute();
conn.commit();
- SQLContext sqlContext = SparkUtil.getSqlContext();
- DataFrame phoenixDataSet =
- new PhoenixRDD(SparkUtil.getSparkContext(), tableName,
- JavaConverters.asScalaBufferConverter(Lists.newArrayList("col1", "col2", "col4")).asScala().toSeq(),
- Option.apply((String) null), Option.apply(getUrl()), config, false,
- null).toDataFrame(sqlContext);
-
- phoenixDataSet.registerTempTable(tableName);
- DataFrame dataset =
+ SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
+ Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+ .option(DataSourceOptions.TABLE_KEY, tableName)
+ .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+ phoenixDataSet.createOrReplaceTempView(tableName);
+ Dataset<Row> dataset =
sqlContext.sql("SELECT col1+col2, col4, a_string FROM " + tableName
+ " ORDER BY col1+col2, col4");
List<Row> rows = dataset.collectAsList();
@@ -395,19 +359,12 @@
conn.commit();
- List<String> columns =
- Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D",
- "COL2");
-
- SQLContext sqlContext = SparkUtil.getSqlContext();
- DataFrame phoenixDataSet =
- new PhoenixRDD(SparkUtil.getSparkContext(), tableName,
- JavaConverters.asScalaBufferConverter(columns).asScala().toSeq(),
- Option.apply((String) null), Option.apply(url), config, false, null)
- .toDataFrame(sqlContext);
-
- phoenixDataSet.registerTempTable(tableName);
- DataFrame dataset =
+ SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
+ Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+ .option(DataSourceOptions.TABLE_KEY, tableName)
+ .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+ phoenixDataSet.createOrReplaceTempView(tableName);
+ Dataset<Row> dataset =
sqlContext.sql("SELECT A_STRING, `CF1.A`, `CF1.B`, COL1, `CF2.C`, `CF2.D`, COL2 from "
+ tableName + " ORDER BY `CF1.A`,`CF2.C`");
List<Row> rows = dataset.collectAsList();
diff --git a/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java b/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
index db2fe1a..668c3c8 100644
--- a/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
+++ b/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
@@ -22,13 +22,14 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
import org.apache.phoenix.util.QueryBuilder;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.SparkPlan;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
import scala.Option;
import scala.collection.JavaConverters;
@@ -42,33 +43,15 @@
public static final String APP_NAME = "Java Spark Tests";
public static final String NUM_EXECUTORS = "local[2]";
public static final String UI_SHOW_CONSOLE_PROGRESS = "spark.ui.showConsoleProgress";
- public static final String CASE_SENSITIVE_COLUMNS = "spark.sql.caseSensitive";
- private static SparkContext sparkContext = null;
- private static SQLContext sqlContext = null;
-
- public static SparkContext getSparkContext() {
- if (sparkContext == null) {
- SparkConf conf = new SparkConf(true);
- conf.setAppName(APP_NAME);
- conf.setMaster(NUM_EXECUTORS);
- conf.set(UI_SHOW_CONSOLE_PROGRESS, "false");
- conf.set(CASE_SENSITIVE_COLUMNS, "false");
- sparkContext = new SparkContext(conf);
- }
- return sparkContext;
- }
-
- public static SQLContext getSqlContext() {
- if (sqlContext == null) {
- sqlContext = new SQLContext(getSparkContext());
- }
- return sqlContext;
+ public static SparkSession getSparkSession() {
+ return SparkSession.builder().appName(APP_NAME).master(NUM_EXECUTORS)
+ .config(UI_SHOW_CONSOLE_PROGRESS, false).getOrCreate();
}
public static ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder, String url, Configuration config)
throws SQLException {
- SQLContext sqlContext = SparkUtil.getSqlContext();
+ SQLContext sqlContext = getSparkSession().sqlContext();
boolean forceRowKeyOrder =
conn.unwrap(PhoenixConnection.class).getQueryServices().getProps()
@@ -82,14 +65,12 @@
// create PhoenixRDD using the table name and columns that are required by the query
// since we don't set the predicate filtering is done after rows are returned from spark
- DataFrame phoenixDataSet =
- new PhoenixRDD(SparkUtil.getSparkContext(), queryBuilder.getFullTableName(),
- JavaConverters.asScalaBufferConverter(queryBuilder.getRequiredColumns()).asScala().toSeq(),
- Option.apply((String) null), Option.apply(url), config, false,
- null).toDataFrame(sqlContext);
+ Dataset phoenixDataSet = getSparkSession().read().format("phoenix")
+ .option(DataSourceOptions.TABLE_KEY, queryBuilder.getFullTableName())
+ .option(PhoenixDataSource.ZOOKEEPER_URL, url).load();
- phoenixDataSet.registerTempTable(queryBuilder.getFullTableName());
- DataFrame dataset = sqlContext.sql(queryBuilder.build());
+ phoenixDataSet.createOrReplaceTempView(queryBuilder.getFullTableName());
+ Dataset<Row> dataset = sqlContext.sql(queryBuilder.build());
SparkPlan plan = dataset.queryExecution().executedPlan();
List<Row> rows = dataset.collectAsList();
queryBuilder.setOrderByClause(prevOrderBy);
diff --git a/phoenix-spark/src/it/resources/globalSetup.sql b/phoenix-spark/src/it/resources/globalSetup.sql
index 7ac0039..efdb8cb 100644
--- a/phoenix-spark/src/it/resources/globalSetup.sql
+++ b/phoenix-spark/src/it/resources/globalSetup.sql
@@ -26,9 +26,9 @@
UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (4, 2, 'test_child_2')
UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (5, 2, 'test_child_3')
UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (6, 2, 'test_child_4')
-CREATE TABLE "table3" ("id" BIGINT NOT NULL PRIMARY KEY, "col1" VARCHAR)
-UPSERT INTO "table3" ("id", "col1") VALUES (1, 'foo')
-UPSERT INTO "table3" ("id", "col1") VALUES (2, 'bar')
+CREATE TABLE "table4" ("id" BIGINT NOT NULL PRIMARY KEY, "col1" VARCHAR)
+UPSERT INTO "table4" ("id", "col1") VALUES (1, 'foo')
+UPSERT INTO "table4" ("id", "col1") VALUES (2, 'bar')
CREATE TABLE ARRAY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])
UPSERT INTO ARRAY_TEST_TABLE (ID, VCARRAY) VALUES (1, ARRAY['String1', 'String2', 'String3'])
CREATE TABLE ARRAYBUFFER_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[], INTARRAY INTEGER[])
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
index ca3470f..a9c2070 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
@@ -19,6 +19,7 @@
import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT
import org.apache.phoenix.query.BaseTest
import org.apache.phoenix.util.PhoenixRuntime
+import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite, Matchers}
@@ -50,7 +51,7 @@
final val TenantId = "theTenant"
var conn: Connection = _
- var sc: SparkContext = _
+ var spark: SparkSession = _
lazy val hbaseConfiguration = {
val conf = PhoenixSparkITHelper.getTestClusterConfig
@@ -99,12 +100,17 @@
.setMaster("local[2]") // 2 threads, some parallelism
.set("spark.ui.showConsoleProgress", "false") // Disable printing stage progress
- sc = new SparkContext(conf)
+ spark = SparkSession
+ .builder()
+ .appName("PhoenixSparkIT")
+ .master("local[2]") // 2 threads, some parallelism
+ .config("spark.ui.showConsoleProgress", "false")
+ .getOrCreate()
}
override def afterAll() {
conn.close()
- sc.stop()
+ spark.stop()
PhoenixSparkITHelper.cleanUpAfterTest()
PhoenixSparkITHelper.doTeardown
}
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index fb4bb64..6bef721 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -13,16 +13,16 @@
*/
package org.apache.phoenix.spark
+import java.sql.DriverManager
import java.util.Date
import org.apache.phoenix.schema.types.PVarchar
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{Row, SQLContext, SaveMode}
-import org.joda.time.DateTime
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.{Row, SaveMode}
+
import scala.collection.mutable.ListBuffer
-import org.apache.hadoop.conf.Configuration
/**
* Note: If running directly from an IDE, these are the recommended VM parameters:
@@ -30,13 +30,20 @@
*/
class PhoenixSparkIT extends AbstractPhoenixSparkIT {
- test("Can persist data with case senstive columns (like in avro schema) using 'DataFrame.saveToPhoenix'") {
- val sqlContext = new SQLContext(sc)
- val df = sqlContext.createDataFrame(
+ test("Can persist data with case sensitive columns (like in avro schema)") {
+ val df = spark.createDataFrame(
Seq(
(1, 1, "test_child_1"),
- (2, 1, "test_child_2"))).toDF("ID", "TABLE3_ID", "t2col1")
- df.saveToPhoenix("TABLE3", zkUrl = Some(quorumAddress),skipNormalizingIdentifier=true)
+ (2, 1, "test_child_2"))).
+ // column names are case sensitive
+ toDF("ID", "TABLE3_ID", "t2col1")
+ df.write
+ .format("phoenix")
+ .options(Map("table" -> "TABLE3",
+ PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress, PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER -> "true"))
+ .mode(SaveMode.Overwrite)
+ .save()
+
// Verify results
val stmt = conn.createStatement()
@@ -50,37 +57,55 @@
stmt.close()
results.toList shouldEqual checkResults
-
}
-
+
+ // INSERT is not support using DataSource v2 api yet
+ ignore("Can use write data using spark SQL INSERT") {
+ val df1 = spark.sqlContext.read.format("phoenix")
+ .options( Map("table" -> "TABLE3", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
+ df1.createOrReplaceTempView("TABLE3")
+
+ // Insert data
+ spark.sql("INSERT INTO TABLE3 VALUES(10, 10, 10)")
+ spark.sql("INSERT INTO TABLE3 VALUES(20, 20, 20)")
+
+ // Verify results
+ val stmt = conn.createStatement()
+ val rs = stmt.executeQuery("SELECT * FROM TABLE3 WHERE ID>=10")
+ val expectedResults = List((10, 10, "10"), (20, 20, "20"))
+ val results = ListBuffer[(Long, Long, String)]()
+ while (rs.next()) {
+ results.append((rs.getLong(1), rs.getLong(2), rs.getString(3)))
+ }
+ stmt.close()
+
+ results.toList shouldEqual expectedResults
+ }
+
test("Can convert Phoenix schema") {
val phoenixSchema = List(
new ColumnInfo("varcharColumn", PVarchar.INSTANCE.getSqlType)
)
- val rdd = new PhoenixRDD(sc, "MyTable", Array("Foo", "Bar"),
- conf = hbaseConfiguration)
+ val catalystSchema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema)
- val catalystSchema = rdd.phoenixSchemaToCatalystSchema(phoenixSchema)
-
- val expected = List(StructField("varcharColumn", StringType, nullable = true))
+ val expected = new StructType(List(StructField("varcharColumn", StringType, nullable = true)).toArray)
catalystSchema shouldEqual expected
}
test("Can create schema RDD and execute query") {
- val sqlContext = new SQLContext(sc)
+ val df1 = spark.sqlContext.read.format("phoenix")
+ .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
- val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = hbaseConfiguration)
+ df1.createOrReplaceTempView("sql_table_1")
- df1.registerTempTable("sql_table_1")
+ val df2 = spark.sqlContext.read.format("phoenix")
+ .options( Map("table" -> "TABLE2", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
- val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"),
- conf = hbaseConfiguration)
+ df2.createOrReplaceTempView("sql_table_2")
- df2.registerTempTable("sql_table_2")
-
- val sqlRdd = sqlContext.sql(
+ val sqlRdd = spark.sql(
"""
|SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1
|INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin
@@ -91,18 +116,49 @@
count shouldEqual 6L
}
+ ignore("Ordering by pk columns should not require sorting") {
+ val df1 = spark.sqlContext.read.format("phoenix")
+ .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
+ df1.createOrReplaceTempView("TABLE1")
+
+ val sqlRdd = spark.sql("SELECT * FROM TABLE1 ORDER BY ID, COL1")
+ val plan = sqlRdd.queryExecution.sparkPlan
+ // verify the spark plan doesn't have a sort
+ assert(!plan.toString.contains("Sort"))
+
+ val expectedResults = Array(Row.fromSeq(Seq(1, "test_row_1")), Row.fromSeq(Seq(2, "test_row_2")))
+ val actual = sqlRdd.collect()
+
+ actual shouldEqual expectedResults
+ }
+
+ test("Verify correct number of partitions are created") {
+ val conn = DriverManager.getConnection(PhoenixSparkITHelper.getUrl)
+ val ddl = "CREATE TABLE SPLIT_TABLE (id VARCHAR NOT NULL PRIMARY KEY, val VARCHAR) split on ('e','j','o')"
+ conn.createStatement.execute(ddl)
+ val keys = Array("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s",
+ "t", "u", "v", "w", "x", "y", "z")
+ for (key <- keys) {
+ conn.createStatement.execute("UPSERT INTO SPLIT_TABLE VALUES('" + key + "', '" + key + "')")
+ }
+ conn.commit()
+
+ val df1 = spark.sqlContext.read.format("phoenix")
+ .options( Map("table" -> "SPLIT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
+ df1.createOrReplaceTempView("SPLIT_TABLE")
+ val sqlRdd = spark.sql("SELECT * FROM SPLIT_TABLE")
+ val numPartitions = sqlRdd.rdd.partitions.size
+
+ numPartitions shouldEqual 4
+ }
+
test("Can create schema RDD and execute query on case sensitive table (no config)") {
- val sqlContext = new SQLContext(sc)
+ val df1 = spark.sqlContext.read.format("phoenix")
+ .options( Map("table" -> SchemaUtil.getEscapedArgument("table4"), PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
+ df1.createOrReplaceTempView("table4")
- val df1 = sqlContext.phoenixTableAsDataFrame(
- SchemaUtil.getEscapedArgument("table3"),
- Array("id", "col1"),
- zkUrl = Some(quorumAddress))
-
- df1.registerTempTable("table3")
-
- val sqlRdd = sqlContext.sql("SELECT * FROM table3")
+ val sqlRdd = spark.sql("SELECT id FROM table4")
val count = sqlRdd.count()
@@ -110,20 +166,17 @@
}
test("Can create schema RDD and execute constrained query") {
- val sqlContext = new SQLContext(sc)
+ val df1 = spark.sqlContext.read.format("phoenix")
+ .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
- val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"),
- conf = hbaseConfiguration)
+ df1.createOrReplaceTempView("sql_table_1")
- df1.registerTempTable("sql_table_1")
+ val df2 = spark.sqlContext.read.format("phoenix")
+ .options( Map("table" -> "TABLE2", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load.filter("ID = 1")
- val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"),
- predicate = Some("\"ID\" = 1"),
- conf = hbaseConfiguration)
+ df2.createOrReplaceTempView("sql_table_2")
- df2.registerTempTable("sql_table_2")
-
- val sqlRdd = sqlContext.sql(
+ val sqlRdd = spark.sql(
"""
|SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1
|INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin
@@ -135,17 +188,12 @@
}
test("Can create schema RDD with predicate that will never match") {
- val sqlContext = new SQLContext(sc)
+ val df1 = spark.sqlContext.read.format("phoenix")
+ .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load.filter("ID = -1")
- val df1 = sqlContext.phoenixTableAsDataFrame(
- SchemaUtil.getEscapedArgument("table3"),
- Array("id", "col1"),
- predicate = Some("\"id\" = -1"),
- conf = hbaseConfiguration)
+ df1.createOrReplaceTempView("table3")
- df1.registerTempTable("table3")
-
- val sqlRdd = sqlContext.sql("SELECT * FROM table3")
+ val sqlRdd = spark.sql("SELECT * FROM table3")
val count = sqlRdd.count()
@@ -153,21 +201,17 @@
}
test("Can create schema RDD with complex predicate") {
- val sqlContext = new SQLContext(sc)
+ val predicate = "ID > 0 AND TIMESERIES_KEY BETWEEN " +
+ "CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND " +
+ "CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)"
+ val df1 = spark.sqlContext.read.format("phoenix")
+ .options(Map("table" -> "DATE_PREDICATE_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+ .load
+ .filter(predicate)
- val df1 = sqlContext.phoenixTableAsDataFrame(
- "DATE_PREDICATE_TEST_TABLE",
- Array("ID", "TIMESERIES_KEY"),
- predicate = Some(
- """
- |ID > 0 AND TIMESERIES_KEY BETWEEN
- |CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND
- |CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)""".stripMargin),
- conf = hbaseConfiguration)
+ df1.createOrReplaceTempView("date_predicate_test_table")
- df1.registerTempTable("date_predicate_test_table")
-
- val sqlRdd = df1.sqlContext.sql("SELECT * FROM date_predicate_test_table")
+ val sqlRdd = spark.sqlContext.sql("SELECT * FROM date_predicate_test_table")
val count = sqlRdd.count()
@@ -175,14 +219,12 @@
}
test("Can query an array table") {
- val sqlContext = new SQLContext(sc)
+ val df1 = spark.sqlContext.read.format("phoenix")
+ .options( Map("table" -> "ARRAY_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
- val df1 = sqlContext.phoenixTableAsDataFrame("ARRAY_TEST_TABLE", Array("ID", "VCARRAY"),
- conf = hbaseConfiguration)
+ df1.createOrReplaceTempView("ARRAY_TEST_TABLE")
- df1.registerTempTable("ARRAY_TEST_TABLE")
-
- val sqlRdd = sqlContext.sql("SELECT * FROM ARRAY_TEST_TABLE")
+ val sqlRdd = spark.sql("SELECT * FROM ARRAY_TEST_TABLE")
val count = sqlRdd.count()
@@ -195,12 +237,12 @@
}
test("Can read a table as an RDD") {
- val rdd1 = sc.phoenixTableAsRDD("ARRAY_TEST_TABLE", Seq("ID", "VCARRAY"),
- conf = hbaseConfiguration)
+ val rdd1 = spark.sqlContext.read.format("phoenix")
+ .options( Map("table" -> "ARRAY_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
val count = rdd1.count()
- val arrayValues = rdd1.take(1)(0)("VCARRAY")
+ val arrayValues = rdd1.take(1)(0)(1)
arrayValues should equal(Array("String1", "String2", "String3"))
@@ -208,24 +250,30 @@
}
test("Can save to phoenix table") {
- val sqlContext = new SQLContext(sc)
+ val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3))
- val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))
+ val schema = StructType(
+ Seq(StructField("ID", LongType, nullable = false),
+ StructField("COL1", StringType),
+ StructField("COL2", IntegerType)))
- sc
- .parallelize(dataSet)
- .saveToPhoenix(
- "OUTPUT_TEST_TABLE",
- Seq("ID", "COL1", "COL2"),
- hbaseConfiguration
- )
+ val rowRDD = spark.sparkContext.parallelize(dataSet)
+
+ // Apply the schema to the RDD.
+ val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+
+ df.write
+ .format("phoenix")
+ .options(Map("table" -> "OUTPUT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+ .mode(SaveMode.Overwrite)
+ .save()
// Load the results back
val stmt = conn.createStatement()
val rs = stmt.executeQuery("SELECT ID, COL1, COL2 FROM OUTPUT_TEST_TABLE")
- val results = ListBuffer[(Long, String, Int)]()
+ val results = ListBuffer[Row]()
while (rs.next()) {
- results.append((rs.getLong(1), rs.getString(2), rs.getInt(3)))
+ results.append(Row(rs.getLong(1), rs.getString(2), rs.getInt(3)))
}
// Verify they match
@@ -234,18 +282,29 @@
}
}
- test("Can save Java and Joda dates to Phoenix (no config)") {
- val dt = new DateTime()
- val date = new Date()
+ test("Can save dates to Phoenix using java.sql.Date") {
+ val date = java.sql.Date.valueOf("2016-09-30")
- val dataSet = List((1L, "1", 1, dt), (2L, "2", 2, date))
- sc
- .parallelize(dataSet)
- .saveToPhoenix(
- "OUTPUT_TEST_TABLE",
- Seq("ID", "COL1", "COL2", "COL3"),
- zkUrl = Some(quorumAddress)
- )
+ // Since we are creating a Row we have to use java.sql.date
+ // java.util.date or joda.DateTime is not supported
+ val dataSet = Seq(Row(1L, "1", 1, date), Row(2L, "2", 2, date))
+
+ val schema = StructType(
+ Seq(StructField("ID", LongType, nullable = false),
+ StructField("COL1", StringType),
+ StructField("COL2", IntegerType),
+ StructField("COL3", DateType)))
+
+ val rowRDD = spark.sparkContext.parallelize(dataSet)
+
+ // Apply the schema to the RDD.
+ val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+
+ df.write
+ .format("phoenix")
+ .options(Map("table" -> "OUTPUT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+ .mode(SaveMode.Overwrite)
+ .save()
// Load the results back
val stmt = conn.createStatement()
@@ -256,94 +315,56 @@
}
// Verify the epochs are equal
- results(0).getTime shouldEqual dt.getMillis
+ results(0).getTime shouldEqual date.getTime
results(1).getTime shouldEqual date.getTime
}
test("Can infer schema without defining columns") {
- val sqlContext = new SQLContext(sc)
- val df = sqlContext.phoenixTableAsDataFrame("TABLE2", Seq(), conf = hbaseConfiguration)
+ val df = spark.sqlContext.read.format("phoenix")
+ .options( Map("table" -> "TABLE2", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load()
df.schema("ID").dataType shouldEqual LongType
df.schema("TABLE1_ID").dataType shouldEqual LongType
df.schema("t2col1").dataType shouldEqual StringType
}
test("Spark SQL can use Phoenix as a data source with no schema specified") {
- val sqlContext = new SQLContext(sc)
- val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
- "zkUrl" -> quorumAddress))
+ val df = spark.sqlContext.read.format("phoenix")
+ .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
df.count() shouldEqual 2
df.schema("ID").dataType shouldEqual LongType
df.schema("COL1").dataType shouldEqual StringType
}
- ignore("Spark SQL can use Phoenix as a data source with PrunedFilteredScan") {
- val sqlContext = new SQLContext(sc)
- val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
- "zkUrl" -> quorumAddress))
+ test("Datasource v2 pushes down filters") {
+ val df = spark.sqlContext.read.format("phoenix")
+ .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
val res = df.filter(df("COL1") === "test_row_1" && df("ID") === 1L).select(df("ID"))
// Make sure we got the right value back
assert(res.first().getLong(0) == 1L)
val plan = res.queryExecution.sparkPlan
- // filters should be pushed into phoenix relation
- assert(plan.toString.contains("PushedFilters: [*IsNotNull(COL1), *IsNotNull(ID), " +
- "*EqualTo(COL1,test_row_1), *EqualTo(ID,1)]"))
- // spark should run the filters on the rows returned by Phoenix
- assert(!plan.toString.matches(".*Filter (((isnotnull(COL1.*) && isnotnull(ID.*)) "
- + " && (COL1.* = test_row_1)) && (ID.* = 1)).*"))
+ // filters should be pushed into scan
+ assert(".*ScanV2 phoenix.*Filters.*ID.*COL1.*".r.findFirstIn(plan.toString).isDefined)
+ // spark should not do post scan filtering
+ assert(".*Filter .*ID.*COL1.*".r.findFirstIn(plan.toString).isEmpty)
}
- test("Can persist a dataframe using 'DataFrame.saveToPhoenix'") {
+ test("Can persist a dataframe") {
// Load from TABLE1
- val sqlContext = new SQLContext(sc)
- val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
- "zkUrl" -> quorumAddress))
+ val df = spark.sqlContext.read.format("phoenix").options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
// Save to TABLE1_COPY
- df.saveToPhoenix("TABLE1_COPY", zkUrl = Some(quorumAddress))
-
- // Verify results
- val stmt = conn.createStatement()
- val rs = stmt.executeQuery("SELECT * FROM TABLE1_COPY")
-
- val checkResults = List((1L, "test_row_1"), (2, "test_row_2"))
- val results = ListBuffer[(Long, String)]()
- while (rs.next()) {
- results.append((rs.getLong(1), rs.getString(2)))
- }
- stmt.close()
-
- results.toList shouldEqual checkResults
- }
-
- test("Can persist a dataframe using 'DataFrame.save()") {
- // Clear TABLE1_COPY
- var stmt = conn.createStatement()
- stmt.executeUpdate("DELETE FROM TABLE1_COPY")
- stmt.close()
-
- // Load TABLE1, save as TABLE1_COPY
- val sqlContext = new SQLContext(sc)
- val df = sqlContext
- .read
- .format("org.apache.phoenix.spark")
- .option("table", "TABLE1")
- .option("zkUrl", quorumAddress)
- .load()
-
- // Save to TABLE21_COPY
df
.write
- .format("org.apache.phoenix.spark")
+ .format("phoenix")
.mode(SaveMode.Overwrite)
.option("table", "TABLE1_COPY")
- .option("zkUrl", quorumAddress)
+ .option(PhoenixDataSource.ZOOKEEPER_URL, quorumAddress)
.save()
// Verify results
- stmt = conn.createStatement()
+ val stmt = conn.createStatement()
val rs = stmt.executeQuery("SELECT * FROM TABLE1_COPY")
val checkResults = List((1L, "test_row_1"), (2, "test_row_2"))
@@ -357,15 +378,22 @@
}
test("Can save arrays back to phoenix") {
- val dataSet = List((2L, Array("String1", "String2", "String3")))
+ val dataSet = List(Row(2L, Array("String1", "String2", "String3")))
+ val schema = StructType(Seq(
+ StructField("ID", LongType, nullable = false),
+ StructField("VCARRAY", ArrayType(StringType, true))
+ ))
- sc
- .parallelize(dataSet)
- .saveToPhoenix(
- "ARRAY_TEST_TABLE",
- Seq("ID", "VCARRAY"),
- zkUrl = Some(quorumAddress)
- )
+ val rowRDD = spark.sparkContext.parallelize(dataSet)
+
+ // Apply the schema to the RDD.
+ val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+
+ df.write
+ .format("phoenix")
+ .options(Map("table" -> "ARRAY_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+ .mode(SaveMode.Overwrite)
+ .save()
// Load the results back
val stmt = conn.createStatement()
@@ -374,57 +402,54 @@
val sqlArray = rs.getArray(1).getArray().asInstanceOf[Array[String]]
// Verify the arrays are equal
- sqlArray shouldEqual dataSet(0)._2
+ sqlArray shouldEqual dataSet(0).get(1)
}
test("Can read from table with schema and escaped table name") {
// Manually escape
- val rdd1 = sc.phoenixTableAsRDD(
- "CUSTOM_ENTITY.\"z02\"",
- Seq("ID"),
- conf = hbaseConfiguration)
+ val df1 = spark.sqlContext.read.format("phoenix")
+ .options(Map("table" -> "CUSTOM_ENTITY.\"z02\"", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load()
- var count = rdd1.count()
+ var count = df1.count()
count shouldEqual 1L
// Use SchemaUtil
- val rdd2 = sc.phoenixTableAsRDD(
- SchemaUtil.getEscapedFullTableName("CUSTOM_ENTITY.z02"),
- Seq("ID"),
- conf = hbaseConfiguration)
+ val df2 = spark.sqlContext.read.format("phoenix")
+ .options(
+ Map("table" -> SchemaUtil.getEscapedFullTableName("CUSTOM_ENTITY.z02"), PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+ .load()
- count = rdd2.count()
+ count = df2.count()
count shouldEqual 1L
-
}
test("Ensure DataFrame field normalization (PHOENIX-2196)") {
- val rdd1 = sc
+ val rdd1 = spark.sparkContext
.parallelize(Seq((1L, 1L, "One"), (2L, 2L, "Two")))
.map(p => Row(p._1, p._2, p._3))
- val sqlContext = new SQLContext(sc)
-
val schema = StructType(Seq(
StructField("id", LongType, nullable = false),
StructField("table1_id", LongType, nullable = true),
StructField("\"t2col1\"", StringType, nullable = true)
))
- val df = sqlContext.createDataFrame(rdd1, schema)
+ val df = spark.sqlContext.createDataFrame(rdd1, schema)
- df.saveToPhoenix("TABLE2", zkUrl = Some(quorumAddress))
+ df.write
+ .format("phoenix")
+ .options(Map("table" -> "TABLE2", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+ .mode(SaveMode.Overwrite)
+ .save()
}
test("Ensure Dataframe supports LIKE and IN filters (PHOENIX-2328)") {
- val sqlContext = new SQLContext(sc)
- val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
- "zkUrl" -> quorumAddress))
-
+ val df = spark.sqlContext.read.format("phoenix").options(Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load()
// Prefix match
val res1 = df.filter("COL1 like 'test_row_%'")
+ val plan = res1.groupBy().count().queryExecution.sparkPlan
res1.count() shouldEqual 2
// Suffix match
@@ -463,14 +488,14 @@
}
test("Can load decimal types with accurate precision and scale (PHOENIX-2288)") {
- val sqlContext = new SQLContext(sc)
- val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TEST_DECIMAL", "zkUrl" -> quorumAddress))
+ val df = spark.sqlContext.read.format("phoenix")
+ .options(Map("table" -> "TEST_DECIMAL", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load()
assert(df.select("COL1").first().getDecimal(0) == BigDecimal("123.456789").bigDecimal)
}
- test("Can load small and tiny integeger types (PHOENIX-2426)") {
- val sqlContext = new SQLContext(sc)
- val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TEST_SMALL_TINY", "zkUrl" -> quorumAddress))
+ test("Can load small and tiny integer types (PHOENIX-2426)") {
+ val df = spark.sqlContext.read.format("phoenix")
+ .options(Map("table" -> "TEST_SMALL_TINY", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load()
assert(df.select("COL1").first().getShort(0).toInt == 32767)
assert(df.select("COL2").first().getByte(0).toInt == 127)
}
@@ -478,21 +503,19 @@
test("Can save arrays from custom dataframes back to phoenix") {
val dataSet = List(Row(2L, Array("String1", "String2", "String3"), Array(1, 2, 3)))
- val sqlContext = new SQLContext(sc)
-
val schema = StructType(
Seq(StructField("ID", LongType, nullable = false),
StructField("VCARRAY", ArrayType(StringType)),
StructField("INTARRAY", ArrayType(IntegerType))))
- val rowRDD = sc.parallelize(dataSet)
+ val rowRDD = spark.sparkContext.parallelize(dataSet)
// Apply the schema to the RDD.
- val df = sqlContext.createDataFrame(rowRDD, schema)
+ val df = spark.sqlContext.createDataFrame(rowRDD, schema)
df.write
- .format("org.apache.phoenix.spark")
- .options(Map("table" -> "ARRAYBUFFER_TEST_TABLE", "zkUrl" -> quorumAddress))
+ .format("phoenix")
+ .options(Map("table" -> "ARRAYBUFFER_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
.mode(SaveMode.Overwrite)
.save()
@@ -509,15 +532,23 @@
}
test("Can save arrays of AnyVal type back to phoenix") {
- val dataSet = List((2L, Array(1, 2, 3), Array(1L, 2L, 3L)))
+ val dataSet = List(Row(2L, Array(1, 2, 3), Array(1L, 2L, 3L)))
- sc
- .parallelize(dataSet)
- .saveToPhoenix(
- "ARRAY_ANYVAL_TEST_TABLE",
- Seq("ID", "INTARRAY", "BIGINTARRAY"),
- zkUrl = Some(quorumAddress)
- )
+ val schema = StructType(
+ Seq(StructField("ID", LongType, nullable = false),
+ StructField("INTARRAY", ArrayType(IntegerType)),
+ StructField("BIGINTARRAY", ArrayType(LongType))))
+
+ val rowRDD = spark.sparkContext.parallelize(dataSet)
+
+ // Apply the schema to the RDD.
+ val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+
+ df.write
+ .format("phoenix")
+ .options(Map("table" -> "ARRAY_ANYVAL_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+ .mode(SaveMode.Overwrite)
+ .save()
// Load the results back
val stmt = conn.createStatement()
@@ -527,20 +558,27 @@
val longArray = rs.getArray(2).getArray().asInstanceOf[Array[Long]]
// Verify the arrays are equal
- intArray shouldEqual dataSet(0)._2
- longArray shouldEqual dataSet(0)._3
+ intArray shouldEqual dataSet(0).get(1)
+ longArray shouldEqual dataSet(0).get(2)
}
test("Can save arrays of Byte type back to phoenix") {
- val dataSet = List((2L, Array(1.toByte, 2.toByte, 3.toByte)))
+ val dataSet = List(Row(2L, Array(1.toByte, 2.toByte, 3.toByte)))
- sc
- .parallelize(dataSet)
- .saveToPhoenix(
- "ARRAY_BYTE_TEST_TABLE",
- Seq("ID", "BYTEARRAY"),
- zkUrl = Some(quorumAddress)
- )
+ val schema = StructType(
+ Seq(StructField("ID", LongType, nullable = false),
+ StructField("BYTEARRAY", ArrayType(ByteType))))
+
+ val rowRDD = spark.sparkContext.parallelize(dataSet)
+
+ // Apply the schema to the RDD.
+ val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+
+ df.write
+ .format("phoenix")
+ .options(Map("table" -> "ARRAY_BYTE_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+ .mode(SaveMode.Overwrite)
+ .save()
// Load the results back
val stmt = conn.createStatement()
@@ -549,19 +587,28 @@
val byteArray = rs.getArray(1).getArray().asInstanceOf[Array[Byte]]
// Verify the arrays are equal
- byteArray shouldEqual dataSet(0)._2
+ byteArray shouldEqual dataSet(0).get(1)
}
test("Can save binary types back to phoenix") {
- val dataSet = List((2L, Array[Byte](1), Array[Byte](1, 2, 3), Array[Array[Byte]](Array[Byte](1), Array[Byte](2))))
+ val dataSet = List(Row(2L, Array[Byte](1), Array[Byte](1, 2, 3), Array[Array[Byte]](Array[Byte](1), Array[Byte](2))))
- sc
- .parallelize(dataSet)
- .saveToPhoenix(
- "VARBINARY_TEST_TABLE",
- Seq("ID", "BIN", "VARBIN", "BINARRAY"),
- zkUrl = Some(quorumAddress)
- )
+ val schema = StructType(
+ Seq(StructField("ID", LongType, false),
+ StructField("BIN", BinaryType),
+ StructField("VARBIN", BinaryType),
+ StructField("BINARRAY", ArrayType(BinaryType))))
+
+ val rowRDD = spark.sparkContext.parallelize(dataSet)
+
+ // Apply the schema to the RDD.
+ val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+
+ df.write
+ .format("phoenix")
+ .options(Map("table" -> "VARBINARY_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+ .mode(SaveMode.Overwrite)
+ .save()
// Load the results back
val stmt = conn.createStatement()
@@ -572,16 +619,15 @@
val varByteArray = rs.getArray("BINARRAY").getArray().asInstanceOf[Array[Array[Byte]]]
// Verify the arrays are equal
- byte shouldEqual dataSet(0)._2
- varByte shouldEqual dataSet(0)._3
- varByteArray shouldEqual dataSet(0)._4
+ byte shouldEqual dataSet(0).get(1)
+ varByte shouldEqual dataSet(0).get(2)
+ varByteArray shouldEqual dataSet(0).get(3)
}
test("Can load Phoenix DATE columns through DataFrame API") {
- val sqlContext = new SQLContext(sc)
- val df = sqlContext.read
- .format("org.apache.phoenix.spark")
- .options(Map("table" -> "DATE_TEST", "zkUrl" -> quorumAddress))
+ val df = spark.sqlContext.read
+ .format("phoenix")
+ .options(Map("table" -> "DATE_TEST", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
.load
val dt = df.select("COL1").first().getDate(0).getTime
val epoch = new Date().getTime
@@ -595,37 +641,37 @@
}
test("Filter operation doesn't work for column names containing a white space (PHOENIX-2547)") {
- val sqlContext = new SQLContext(sc)
- val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("space"),
- "zkUrl" -> quorumAddress))
+ val df = spark.sqlContext.read.format("phoenix")
+ .options(Map("table" -> SchemaUtil.getEscapedArgument("space"), PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+ .load
val res = df.filter(df.col("first name").equalTo("xyz"))
// Make sure we got the right value back
assert(res.collectAsList().size() == 1L)
}
test("Spark Phoenix cannot recognize Phoenix view fields (PHOENIX-2290)") {
- val sqlContext = new SQLContext(sc)
- val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("small"),
- "zkUrl" -> quorumAddress))
- df.registerTempTable("temp")
+ val df = spark.sqlContext.read.format("phoenix")
+ .options(Map("table" -> SchemaUtil.getEscapedArgument("small"), PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+ .load
+ df.createOrReplaceTempView("temp")
// limitation: filter / where expressions are not allowed with "double quotes", instead of that pass it as column expressions
// reason: if the expression contains "double quotes" then spark sql parser, ignoring evaluating .. giving to next level to handle
- val res1 = sqlContext.sql("select * from temp where salary = '10000' ")
+ val res1 = spark.sql("select * from temp where salary = '10000' ")
assert(res1.collectAsList().size() == 1L)
- val res2 = sqlContext.sql("select * from temp where \"salary\" = '10000' ")
+ val res2 = spark.sql("select * from temp where \"salary\" = '10000' ")
assert(res2.collectAsList().size() == 0L)
- val res3 = sqlContext.sql("select * from temp where salary > '10000' ")
+ val res3 = spark.sql("select * from temp where salary > '10000' ")
assert(res3.collectAsList().size() == 2L)
}
test("Queries with small case column-names return empty result-set when working with Spark Datasource Plugin (PHOENIX-2336)") {
- val sqlContext = new SQLContext(sc)
- val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("small"),
- "zkUrl" -> quorumAddress))
+ val df = spark.sqlContext.read.format("phoenix")
+ .options(Map("table" -> SchemaUtil.getEscapedArgument("small"), PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+ .load
// limitation: filter / where expressions are not allowed with "double quotes", instead of that pass it as column expressions
// reason: if the expression contains "double quotes" then spark sql parser, ignoring evaluating .. giving to next level to handle
@@ -644,10 +690,9 @@
}
test("Can coerce Phoenix DATE columns to TIMESTAMP through DataFrame API") {
- val sqlContext = new SQLContext(sc)
- val df = sqlContext.read
- .format("org.apache.phoenix.spark")
- .options(Map("table" -> "DATE_TEST", "zkUrl" -> quorumAddress, "dateAsTimestamp" -> "true"))
+ val df = spark.sqlContext.read
+ .format("phoenix")
+ .options(Map("table" -> "DATE_TEST", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress, "dateAsTimestamp" -> "true"))
.load
val dtRes = df.select("COL1").first()
val ts = dtRes.getTimestamp(0).getTime
@@ -657,10 +702,9 @@
}
test("Can load Phoenix Time columns through DataFrame API") {
- val sqlContext = new SQLContext(sc)
- val df = sqlContext.read
- .format("org.apache.phoenix.spark")
- .options(Map("table" -> "TIME_TEST", "zkUrl" -> quorumAddress))
+ val df = spark.sqlContext.read
+ .format("phoenix")
+ .options(Map("table" -> "TIME_TEST", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
.load
val time = df.select("COL1").first().getTimestamp(0).getTime
val epoch = new Date().getTime
@@ -668,13 +712,14 @@
}
test("can read all Phoenix data types") {
- val sqlContext = new SQLContext(sc)
- val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "GIGANTIC_TABLE",
- "zkUrl" -> quorumAddress))
+ val df = spark.sqlContext.read
+ .format("phoenix")
+ .options(Map("table" -> "GIGANTIC_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+ .load
df.write
- .format("org.apache.phoenix.spark")
- .options(Map("table" -> "OUTPUT_GIGANTIC_TABLE", "zkUrl" -> quorumAddress))
+ .format("phoenix")
+ .options(Map("table" -> "OUTPUT_GIGANTIC_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
.mode(SaveMode.Overwrite)
.save()
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala
index 77b41af..291ea2a 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala
@@ -64,8 +64,7 @@
/*****************/
test("Can read from tenant-specific table as DataFrame") {
- val sqlContext = new SQLContext(sc)
- val df = sqlContext.phoenixTableAsDataFrame(
+ val df = spark.sqlContext.phoenixTableAsDataFrame(
TenantTable,
Seq(OrgIdCol, TenantOnlyCol),
zkUrl = Some(quorumAddress),
@@ -78,7 +77,7 @@
}
test("Can read from tenant-specific table as RDD") {
- val rdd = sc.phoenixTableAsRDD(
+ val rdd = spark.sparkContext.phoenixTableAsRDD(
TenantTable,
Seq(OrgIdCol, TenantOnlyCol),
zkUrl = Some(quorumAddress),
@@ -95,23 +94,23 @@
/*****************/
test("Can write a DataFrame using 'DataFrame.saveToPhoenix' to tenant-specific view") {
- val sqlContext = new SQLContext(sc)
+ val sqlContext = spark.sqlContext
import sqlContext.implicits._
- val df = sc.parallelize(TestDataSet).toDF(OrgIdCol, TenantOnlyCol)
+ val df = spark.sparkContext.parallelize(TestDataSet).toDF(OrgIdCol, TenantOnlyCol)
df.saveToPhoenix(TenantTable, zkUrl = Some(quorumAddress), tenantId = Some(TenantId))
verifyResults
}
test("Can write a DataFrame using 'DataFrame.write' to tenant-specific view") {
- val sqlContext = new SQLContext(sc)
+ val sqlContext = spark.sqlContext
import sqlContext.implicits._
- val df = sc.parallelize(TestDataSet).toDF(OrgIdCol, TenantOnlyCol)
+ val df = spark.sparkContext.parallelize(TestDataSet).toDF(OrgIdCol, TenantOnlyCol)
df.write
- .format("org.apache.phoenix.spark")
+ .format("phoenix")
.mode("overwrite")
.option("table", TenantTable)
.option(PhoenixRuntime.TENANT_ID_ATTRIB, TenantId)
@@ -122,8 +121,7 @@
}
test("Can write an RDD to Phoenix tenant-specific view") {
- val sqlContext = new SQLContext(sc)
- sc
+ spark.sparkContext
.parallelize(TestDataSet)
.saveToPhoenix(
TenantTable,
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
new file mode 100644
index 0000000..ad79d1c
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2;
+
+import java.util.Optional;
+
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.spark.datasource.v2.reader.PhoenixDataSourceReader;
+import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDataSourceWriteOptions;
+import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDatasourceWriter;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Implements the DataSourceV2 api to read and write from Phoenix tables
+ */
+public class PhoenixDataSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister {
+
+ public static final String SKIP_NORMALIZING_IDENTIFIER = "skipNormalizingIdentifier";
+ public static final String ZOOKEEPER_URL = "zkUrl";
+
+ @Override
+ public DataSourceReader createReader(DataSourceOptions options) {
+ return new PhoenixDataSourceReader(options);
+ }
+
+ @Override
+ public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode,
+ DataSourceOptions options) {
+ if (!mode.equals(SaveMode.Overwrite)) {
+ throw new RuntimeException("SaveMode other than SaveMode.OverWrite is not supported");
+ }
+ if (!options.tableName().isPresent()) {
+ throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined");
+ }
+ if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) {
+ throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined");
+ }
+
+ PhoenixDataSourceWriteOptions writeOptions = createPhoenixDataSourceWriteOptions(options, schema);
+ return Optional.of(new PhoenixDatasourceWriter(writeOptions));
+ }
+
+ private PhoenixDataSourceWriteOptions createPhoenixDataSourceWriteOptions(DataSourceOptions options,
+ StructType schema) {
+ String scn = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE).orElse(null);
+ String tenantId = options.get(PhoenixRuntime.TENANT_ID_ATTRIB).orElse(null);
+ String zkUrl = options.get(ZOOKEEPER_URL).get();
+ boolean skipNormalizingIdentifier = options.getBoolean(SKIP_NORMALIZING_IDENTIFIER, false);
+ return new PhoenixDataSourceWriteOptions.Builder().setTableName(options.tableName().get())
+ .setZkUrl(zkUrl).setScn(scn).setTenantId(tenantId).setSchema(schema)
+ .setSkipNormalizingIdentifier(skipNormalizingIdentifier).build();
+ }
+
+ @Override
+ public String shortName() {
+ return "phoenix";
+ }
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
new file mode 100644
index 0000000..8c2fdb1
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.reader;
+
+import java.io.Serializable;
+
+public class PhoenixDataSourceReadOptions implements Serializable {
+
+ private final String tenantId;
+ private final String zkUrl;
+ private final String scn;
+ private final String selectStatement;
+
+ public PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId, String selectStatement) {
+ this.zkUrl = zkUrl;
+ this.scn = scn;
+ this.tenantId = tenantId;
+ this.selectStatement = selectStatement;
+ }
+
+ public String getSelectStatement() {
+ return selectStatement;
+ }
+
+ public String getScn() {
+ return scn;
+ }
+
+ public String getZkUrl() {
+ return zkUrl;
+ }
+
+ public String getTenantId() {
+ return tenantId;
+ }
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
new file mode 100644
index 0000000..446d96f
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.reader;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.RegionSizeCalculator;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.PhoenixInputSplit;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.spark.FilterExpressionCompiler;
+import org.apache.phoenix.spark.SparkSchemaUtil;
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
+import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
+import org.apache.spark.sql.types.StructType;
+import scala.Tuple3;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDownFilters,
+ SupportsPushDownRequiredColumns {
+
+ private final DataSourceOptions options;
+ private final String tableName;
+ private final String zkUrl;
+ private final boolean dateAsTimestamp;
+
+ private StructType schema;
+ private Filter[] pushedFilters = new Filter[]{};
+ // derived from pushedFilters
+ private String whereClause;
+
+ public PhoenixDataSourceReader(DataSourceOptions options) {
+ if (!options.tableName().isPresent()) {
+ throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined");
+ }
+ if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) {
+ throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined");
+ }
+ this.options = options;
+ this.tableName = options.tableName().get();
+ this.zkUrl = options.get("zkUrl").get();
+ this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
+ setSchema();
+ }
+
+ /**
+ * Sets the schema using all the table columns before any column pruning has been done
+ */
+ private void setSchema() {
+ try (Connection conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl)) {
+ List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null);
+ Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
+ schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
+ }
+ catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public StructType readSchema() {
+ return schema;
+ }
+
+ @Override
+ public Filter[] pushFilters(Filter[] filters) {
+ Tuple3<String, Filter[], Filter[]> tuple3 = new FilterExpressionCompiler().pushFilters(filters);
+ whereClause = tuple3._1();
+ pushedFilters = tuple3._3();
+ return tuple3._2();
+ }
+
+ @Override
+ public List<InputPartition<InternalRow>> planInputPartitions() {
+ Optional<String> currentScnValue = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+ Optional<String> tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+ // Generate splits based off statistics, or just region splits?
+ boolean splitByStats = options.getBoolean(
+ PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS);
+ Properties overridingProps = new Properties();
+ if(currentScnValue.isPresent()) {
+ overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue.get());
+ }
+ if (tenantId.isPresent()){
+ overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId.get());
+ }
+ try (Connection conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl, overridingProps)) {
+ List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, Lists.newArrayList(schema.names()));
+ final Statement statement = conn.createStatement();
+ final String selectStatement = QueryUtil.constructSelectStatement(tableName, columnInfos, whereClause);
+ Preconditions.checkNotNull(selectStatement);
+
+ final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+ // Optimize the query plan so that we potentially use secondary indexes
+ final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
+ final Scan scan = queryPlan.getContext().getScan();
+
+ // setting the snapshot configuration
+ Optional<String> snapshotName = options.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
+ if (snapshotName.isPresent())
+ PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection().
+ getQueryServices().getConfiguration(), snapshotName.get());
+
+ // Initialize the query plan so it sets up the parallel scans
+ queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
+
+ List<KeyRange> allSplits = queryPlan.getSplits();
+ // Get the RegionSizeCalculator
+ PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
+ org.apache.hadoop.hbase.client.Connection connection =
+ phxConn.getQueryServices().getAdmin().getConnection();
+ RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(queryPlan
+ .getTableRef().getTable().getPhysicalName().toString()));
+ RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, connection
+ .getAdmin());
+
+ final List<InputPartition<InternalRow>> partitions = Lists.newArrayListWithExpectedSize(allSplits.size());
+ for (List<Scan> scans : queryPlan.getScans()) {
+ // Get the region location
+ HRegionLocation location = regionLocator.getRegionLocation(
+ scans.get(0).getStartRow(),
+ false
+ );
+
+ String regionLocation = location.getHostname();
+
+ // Get the region size
+ long regionSize = sizeCalculator.getRegionSize(
+ location.getRegionInfo().getRegionName()
+ );
+
+ PhoenixDataSourceReadOptions phoenixDataSourceOptions = new PhoenixDataSourceReadOptions(zkUrl,
+ currentScnValue.orElse(null), tenantId.orElse(null), selectStatement);
+ if (splitByStats) {
+ for (Scan aScan : scans) {
+ partitions.add(new PhoenixInputPartition(phoenixDataSourceOptions, schema,
+ new PhoenixInputSplit(Collections.singletonList(aScan), regionSize, regionLocation)));
+ }
+ } else {
+ partitions.add(new PhoenixInputPartition(phoenixDataSourceOptions, schema,
+ new PhoenixInputSplit(scans, regionSize, regionLocation)));
+ }
+ }
+ return partitions;
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to plan query", e);
+ }
+ }
+
+ @Override
+ public Filter[] pushedFilters() {
+ return pushedFilters;
+ }
+
+ @Override
+ public void pruneColumns(StructType schema) {
+ this.schema = schema;
+ }
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
new file mode 100644
index 0000000..624ff0f
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.reader;
+
+import org.apache.phoenix.mapreduce.PhoenixInputSplit;
+import org.apache.spark.SerializableWritable;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.types.StructType;
+
+public class PhoenixInputPartition implements InputPartition<InternalRow> {
+
+ private SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
+ private StructType schema;
+ private PhoenixDataSourceReadOptions options;
+
+ public PhoenixInputPartition(PhoenixDataSourceReadOptions options, StructType schema, PhoenixInputSplit phoenixInputSplit) {
+ this.phoenixInputSplit = new SerializableWritable<>(phoenixInputSplit);
+ this.schema = schema;
+ this.options = options;
+ }
+
+ @Override
+ public InputPartitionReader<InternalRow> createPartitionReader() {
+ return new PhoenixInputPartitionReader(options, schema, phoenixInputSplit);
+ }
+
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
new file mode 100644
index 0000000..30e84db
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.reader;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.iterate.ConcatResultIterator;
+import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
+import org.apache.phoenix.iterate.PeekingResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.RoundRobinResultIterator;
+import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.PhoenixInputSplit;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.spark.SerializableWritable;
+import org.apache.spark.executor.InputMetrics;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.datasources.SparkJdbcUtil;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.types.StructType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import scala.collection.Iterator;
+
+public class PhoenixInputPartitionReader implements InputPartitionReader<InternalRow> {
+
+ private SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
+ private StructType schema;
+ private Iterator<InternalRow> iterator;
+ private PhoenixResultSet resultSet;
+ private InternalRow currentRow;
+ private PhoenixDataSourceReadOptions options;
+
+ public PhoenixInputPartitionReader(PhoenixDataSourceReadOptions options, StructType schema, SerializableWritable<PhoenixInputSplit> phoenixInputSplit) {
+ this.options = options;
+ this.phoenixInputSplit = phoenixInputSplit;
+ this.schema = schema;
+ initialize();
+ }
+
+ private QueryPlan getQueryPlan() throws SQLException {
+ String scn = options.getScn();
+ String tenantId = options.getTenantId();
+ String zkUrl = options.getZkUrl();
+ Properties overridingProps = new Properties();
+ if (scn != null) {
+ overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
+ }
+ if (tenantId != null) {
+ overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ }
+ try (Connection conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl, overridingProps)) {
+ final Statement statement = conn.createStatement();
+ final String selectStatement = options.getSelectStatement();
+ Preconditions.checkNotNull(selectStatement);
+
+ final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+ // Optimize the query plan so that we potentially use secondary indexes
+ final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
+ return queryPlan;
+ }
+ }
+
+ private void initialize() {
+ try {
+ final QueryPlan queryPlan = getQueryPlan();
+ final List<Scan> scans = phoenixInputSplit.value().getScans();
+ List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
+ StatementContext ctx = queryPlan.getContext();
+ ReadMetricQueue readMetrics = ctx.getReadMetricsQueue();
+ String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString();
+
+ // Clear the table region boundary cache to make sure long running jobs stay up to date
+ byte[] tableNameBytes = queryPlan.getTableRef().getTable().getPhysicalName().getBytes();
+ ConnectionQueryServices services = queryPlan.getContext().getConnection().getQueryServices();
+ services.clearTableRegionCache(tableNameBytes);
+
+ long renewScannerLeaseThreshold = queryPlan.getContext().getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
+ for (Scan scan : scans) {
+ // For MR, skip the region boundary check exception if we encounter a split. ref: PHOENIX-2599
+ scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
+
+ PeekingResultIterator peekingResultIterator;
+ ScanMetricsHolder scanMetricsHolder =
+ ScanMetricsHolder.getInstance(readMetrics, tableName, scan,
+ queryPlan.getContext().getConnection().getLogLevel());
+ final TableResultIterator tableResultIterator =
+ new TableResultIterator(
+ queryPlan.getContext().getConnection().getMutationState(), scan,
+ scanMetricsHolder, renewScannerLeaseThreshold, queryPlan,
+ MapReduceParallelScanGrouper.getInstance());
+ peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
+ iterators.add(peekingResultIterator);
+ }
+ ResultIterator iterator = queryPlan.useRoundRobinIterator() ? RoundRobinResultIterator.newIterator(iterators, queryPlan) : ConcatResultIterator.newIterator(iterators);
+ if (queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
+ iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
+ }
+ // Clone the row projector as it's not thread safe and would be used simultaneously by
+ // multiple threads otherwise.
+ this.resultSet = new PhoenixResultSet(iterator, queryPlan.getProjector().cloneIfNecessary(), queryPlan.getContext());
+ this.iterator = SparkJdbcUtil.resultSetToSparkInternalRows(resultSet, schema, new InputMetrics());
+ }
+ catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean next() {
+ if (!iterator.hasNext()) {
+ return false;
+ }
+ currentRow = iterator.next();
+ return true;
+ }
+
+ @Override
+ public InternalRow get() {
+ return currentRow;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(resultSet != null) {
+ try {
+ resultSet.close();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
new file mode 100644
index 0000000..781d1c8
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.writer;
+
+import org.apache.spark.sql.types.StructType;
+
+import java.io.Serializable;
+
+public class PhoenixDataSourceWriteOptions implements Serializable {
+
+ private final String tableName;
+ private final String zkUrl;
+ private final String tenantId;
+ private final String scn;
+ private final StructType schema;
+ private final boolean skipNormalizingIdentifier;
+
+ private PhoenixDataSourceWriteOptions(String tableName, String zkUrl, String scn, String tenantId,
+ StructType schema, boolean skipNormalizingIdentifier) {
+ this.tableName = tableName;
+ this.zkUrl = zkUrl;
+ this.scn = scn;
+ this.tenantId = tenantId;
+ this.schema = schema;
+ this.skipNormalizingIdentifier = skipNormalizingIdentifier;
+ }
+
+ public String getScn() {
+ return scn;
+ }
+
+ public String getZkUrl() {
+ return zkUrl;
+ }
+
+ public String getTenantId() {
+ return tenantId;
+ }
+
+ public StructType getSchema() {
+ return schema;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public boolean skipNormalizingIdentifier() {
+ return skipNormalizingIdentifier;
+ }
+
+ public static class Builder {
+ private String tableName;
+ private String zkUrl;
+ private String scn;
+ private String tenantId;
+ private StructType schema;
+ private boolean skipNormalizingIdentifier;
+
+ public Builder setTableName(String tableName) {
+ this.tableName = tableName;
+ return this;
+ }
+
+ public Builder setZkUrl(String zkUrl) {
+ this.zkUrl = zkUrl;
+ return this;
+ }
+
+ public Builder setScn(String scn) {
+ this.scn = scn;
+ return this;
+ }
+
+ public Builder setTenantId(String tenantId) {
+ this.tenantId = tenantId;
+ return this;
+ }
+
+ public Builder setSchema(StructType schema) {
+ this.schema = schema;
+ return this;
+ }
+
+ public Builder setSkipNormalizingIdentifier(boolean skipNormalizingIdentifier) {
+ this.skipNormalizingIdentifier = skipNormalizingIdentifier;
+ return this;
+ }
+
+ public PhoenixDataSourceWriteOptions build() {
+ return new PhoenixDataSourceWriteOptions(tableName, zkUrl, scn, tenantId, schema, skipNormalizingIdentifier);
+ }
+ }
+}
\ No newline at end of file
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
new file mode 100644
index 0000000..cf42aa5
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
@@ -0,0 +1,100 @@
+package org.apache.phoenix.spark.datasource.v2.writer;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.datasources.SparkJdbcUtil;
+import org.apache.spark.sql.execution.datasources.jdbc.PhoenixJdbcDialect$;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import com.google.common.collect.Lists;
+
+public class PhoenixDataWriter implements DataWriter<InternalRow> {
+
+ private final StructType schema;
+ private final Connection conn;
+ private final PreparedStatement statement;
+
+ public PhoenixDataWriter(PhoenixDataSourceWriteOptions options) {
+ String scn = options.getScn();
+ String tenantId = options.getTenantId();
+ String zkUrl = options.getZkUrl();
+ Properties overridingProps = new Properties();
+ if (scn != null) {
+ overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
+ }
+ if (tenantId != null) {
+ overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ }
+ this.schema = options.getSchema();
+ try {
+ this.conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl, overridingProps);
+ List<String> colNames = Lists.newArrayList(options.getSchema().names());
+ if (!options.skipNormalizingIdentifier()){
+ colNames = colNames.stream().map(colName -> SchemaUtil.normalizeIdentifier(colName)).collect(Collectors.toList());
+ }
+ String upsertSql = QueryUtil.constructUpsertStatement(options.getTableName(), colNames, null);
+ this.statement = this.conn.prepareStatement(upsertSql);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void write(InternalRow internalRow) throws IOException {
+ try {
+ int i=0;
+ for (StructField field : schema.fields()) {
+ DataType dataType = field.dataType();
+ if (internalRow.isNullAt(i)) {
+ statement.setNull(i + 1, SparkJdbcUtil.getJdbcType(dataType,
+ PhoenixJdbcDialect$.MODULE$).jdbcNullType());
+ } else {
+ Row row = SparkJdbcUtil.toRow(schema, internalRow);
+ SparkJdbcUtil.makeSetter(conn, PhoenixJdbcDialect$.MODULE$, dataType).apply(statement, row, i);
+ }
+ ++i;
+ }
+ statement.execute();
+ } catch (SQLException e) {
+ throw new IOException("Exception while executing Phoenix prepared statement", e);
+ }
+ }
+
+ @Override
+ public WriterCommitMessage commit() throws IOException {
+ try {
+ conn.commit();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ statement.close();
+ conn.close();
+ }
+ catch (SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void abort() throws IOException {
+ }
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
new file mode 100644
index 0000000..751fdfa
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
@@ -0,0 +1,19 @@
+package org.apache.phoenix.spark.datasource.v2.writer;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+
+public class PhoenixDataWriterFactory implements DataWriterFactory<InternalRow> {
+
+ private final PhoenixDataSourceWriteOptions options;
+
+ public PhoenixDataWriterFactory(PhoenixDataSourceWriteOptions options) {
+ this.options = options;
+ }
+
+ @Override
+ public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
+ return new PhoenixDataWriter(options);
+ }
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
new file mode 100644
index 0000000..7847609
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
@@ -0,0 +1,34 @@
+package org.apache.phoenix.spark.datasource.v2.writer;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+
+public class PhoenixDatasourceWriter implements DataSourceWriter {
+
+ private final PhoenixDataSourceWriteOptions options;
+
+ public PhoenixDatasourceWriter(PhoenixDataSourceWriteOptions options) {
+ this.options = options;
+ }
+
+ @Override
+ public DataWriterFactory<InternalRow> createWriterFactory() {
+ return new PhoenixDataWriterFactory(options);
+ }
+
+ @Override
+ public boolean useCommitCoordinator() {
+ return false;
+ }
+
+ @Override
+ public void commit(WriterCommitMessage[] messages) {
+ }
+
+ @Override
+ public void abort(WriterCommitMessage[] messages) {
+ }
+}
diff --git a/phoenix-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/phoenix-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000..6eff1af
--- /dev/null
+++ b/phoenix-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1 @@
+org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
\ No newline at end of file
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
index ca476e7..d555954 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
@@ -21,6 +21,7 @@
import scala.collection.JavaConversions._
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
object ConfigurationUtil extends Serializable {
def getOutputConfiguration(tableName: String, columns: Seq[String], zkUrl: Option[String], tenantId: Option[String] = None, conf: Option[Configuration] = None): Configuration = {
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
index be4a32b..3b0289d 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
@@ -22,7 +22,7 @@
import scala.collection.JavaConversions._
-
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
class DataFrameFunctions(data: DataFrame) extends Serializable {
def saveToPhoenix(parameters: Map[String, String]): Unit = {
saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"), tenantId = parameters.get("TenantId"),
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
index e000b74..ccdf595 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
@@ -20,6 +20,7 @@
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider}
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
class DefaultSource extends RelationProvider with CreatableRelationProvider {
// Override 'RelationProvider.createRelation', this enables DataFrame.load()
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
new file mode 100644
index 0000000..74ff67e
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import java.sql.Timestamp
+import java.text.Format
+
+import org.apache.phoenix.util.{DateUtil, SchemaUtil}
+import org.apache.phoenix.util.StringUtil.escapeStringConstant
+import org.apache.spark.sql.sources._
+
+class FilterExpressionCompiler() {
+
+ val timeformatter:Format = DateUtil.getTimestampFormatter(DateUtil.DEFAULT_TIME_FORMAT, DateUtil.DEFAULT_TIME_ZONE_ID)
+
+ /**
+ * Attempt to create Phoenix-accepted WHERE clause from Spark filters,
+ * mostly inspired from Spark SQL JDBCRDD and the couchbase-spark-connector
+ *
+ * @return tuple representing where clause (derived from supported filters),
+ * array of unsupported filters and array of supported filters
+ */
+ def pushFilters(filters: Array[Filter]): (String, Array[Filter], Array[Filter]) = {
+ if (filters.isEmpty) {
+ return ("" , Array[Filter](), Array[Filter]())
+ }
+
+ val filter = new StringBuilder("")
+ val unsupportedFilters = Array[Filter]();
+ var i = 0
+
+ filters.foreach(f => {
+ // Assume conjunction for multiple filters, unless otherwise specified
+ if (i > 0) {
+ filter.append(" AND")
+ }
+
+ f match {
+ // Spark 1.3.1+ supported filters
+ case And(leftFilter, rightFilter) => {
+ val(whereClause, currUnsupportedFilters, _) = pushFilters(Array(leftFilter, rightFilter))
+ if (currUnsupportedFilters.isEmpty)
+ filter.append(whereClause)
+ else
+ unsupportedFilters :+ f
+ }
+ case Or(leftFilter, rightFilter) => {
+ val(whereLeftClause, leftUnsupportedFilters, _) = pushFilters(Array(leftFilter))
+ val(whereRightClause, rightUnsupportedFilters, _) = pushFilters(Array(rightFilter))
+ if (leftUnsupportedFilters.isEmpty && rightUnsupportedFilters.isEmpty) {
+ filter.append(whereLeftClause + " OR " + whereRightClause)
+ }
+ else {
+ unsupportedFilters :+ f
+ }
+ }
+ case Not(aFilter) => {
+ val(whereClause, currUnsupportedFilters, _) = pushFilters(Array(aFilter))
+ if (currUnsupportedFilters.isEmpty)
+ filter.append(" NOT " + whereClause)
+ else
+ unsupportedFilters :+ f
+ }
+ case EqualTo(attr, value) => filter.append(s" ${escapeKey(attr)} = ${compileValue(value)}")
+ case GreaterThan(attr, value) => filter.append(s" ${escapeKey(attr)} > ${compileValue(value)}")
+ case GreaterThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} >= ${compileValue(value)}")
+ case LessThan(attr, value) => filter.append(s" ${escapeKey(attr)} < ${compileValue(value)}")
+ case LessThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} <= ${compileValue(value)}")
+ case IsNull(attr) => filter.append(s" ${escapeKey(attr)} IS NULL")
+ case IsNotNull(attr) => filter.append(s" ${escapeKey(attr)} IS NOT NULL")
+ case In(attr, values) => filter.append(s" ${escapeKey(attr)} IN ${values.map(compileValue).mkString("(", ",", ")")}")
+ case StringStartsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue(value + "%")}")
+ case StringEndsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value)}")
+ case StringContains(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value + "%")}")
+ case _ => unsupportedFilters :+ f
+ }
+
+ i = i + 1
+ })
+
+ (filter.toString(), unsupportedFilters, filters diff unsupportedFilters)
+ }
+
+ // Helper function to escape string values in SQL queries
+ private def compileValue(value: Any): Any = value match {
+ case stringValue: String => s"'${escapeStringConstant(stringValue)}'"
+
+ case timestampValue: Timestamp => getTimestampString(timestampValue)
+
+ // Borrowed from 'elasticsearch-hadoop', support these internal UTF types across Spark versions
+ // Spark 1.4
+ case utf if (isClass(utf, "org.apache.spark.sql.types.UTF8String")) => s"'${escapeStringConstant(utf.toString)}'"
+ // Spark 1.5
+ case utf if (isClass(utf, "org.apache.spark.unsafe.types.UTF8String")) => s"'${escapeStringConstant(utf.toString)}'"
+
+ // Pass through anything else
+ case _ => value
+ }
+
+ private def getTimestampString(timestampValue: Timestamp): String = {
+ "TO_TIMESTAMP('%s', '%s', '%s')".format(timeformatter.format(timestampValue),
+ DateUtil.DEFAULT_TIME_FORMAT, DateUtil.DEFAULT_TIME_ZONE_ID)
+ }
+
+ // Helper function to escape column key to work with SQL queries
+ private def escapeKey(key: String): String = SchemaUtil.getEscapedFullColumnName(key)
+
+ private def isClass(obj: Any, className: String) = {
+ className.equals(obj.getClass().getName())
+ }
+}
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
index d604e0e..7331a5f 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
@@ -21,9 +21,6 @@
import org.apache.phoenix.jdbc.PhoenixDriver
import org.apache.phoenix.mapreduce.PhoenixInputFormat
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
-import org.apache.phoenix.query.QueryConstants
-import org.apache.phoenix.schema.types._
-import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
@@ -32,6 +29,7 @@
import scala.collection.JavaConverters._
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
predicate: Option[String] = None,
zkUrl: Option[String] = None,
@@ -126,7 +124,7 @@
// Lookup the Spark catalyst types from the Phoenix schema
- val structFields = phoenixSchemaToCatalystSchema(columnInfoList).toArray
+ val structType = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoList, dateAsTimestamp)
// Create the data frame from the converted Spark schema
sqlContext.createDataFrame(map(pr => {
@@ -146,60 +144,7 @@
// Create a Spark Row from the sequence
Row.fromSeq(rowSeq)
- }), new StructType(structFields))
+ }), structType)
}
- def normalizeColumnName(columnName: String) = {
- val unescapedColumnName = SchemaUtil.getUnEscapedFullColumnName(columnName)
- var normalizedColumnName = ""
- if (unescapedColumnName.indexOf(QueryConstants.NAME_SEPARATOR) < 0) {
- normalizedColumnName = unescapedColumnName
- }
- else {
- // split by separator to get the column family and column name
- val tokens = unescapedColumnName.split(QueryConstants.NAME_SEPARATOR_REGEX)
- normalizedColumnName = if (tokens(0) == "0") tokens(1) else unescapedColumnName
- }
- normalizedColumnName
- }
-
- def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo]) = columnList.map(ci => {
- val structType = phoenixTypeToCatalystType(ci)
- StructField(normalizeColumnName(ci.getColumnName), structType)
- })
-
-
- // Lookup table for Phoenix types to Spark catalyst types
- def phoenixTypeToCatalystType(columnInfo: ColumnInfo): DataType = columnInfo.getPDataType match {
- case t if t.isInstanceOf[PVarchar] || t.isInstanceOf[PChar] => StringType
- case t if t.isInstanceOf[PLong] || t.isInstanceOf[PUnsignedLong] => LongType
- case t if t.isInstanceOf[PInteger] || t.isInstanceOf[PUnsignedInt] => IntegerType
- case t if t.isInstanceOf[PSmallint] || t.isInstanceOf[PUnsignedSmallint] => ShortType
- case t if t.isInstanceOf[PTinyint] || t.isInstanceOf[PUnsignedTinyint] => ByteType
- case t if t.isInstanceOf[PFloat] || t.isInstanceOf[PUnsignedFloat] => FloatType
- case t if t.isInstanceOf[PDouble] || t.isInstanceOf[PUnsignedDouble] => DoubleType
- // Use Spark system default precision for now (explicit to work with < 1.5)
- case t if t.isInstanceOf[PDecimal] =>
- if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale)
- case t if t.isInstanceOf[PTimestamp] || t.isInstanceOf[PUnsignedTimestamp] => TimestampType
- case t if t.isInstanceOf[PTime] || t.isInstanceOf[PUnsignedTime] => TimestampType
- case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && !dateAsTimestamp => DateType
- case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && dateAsTimestamp => TimestampType
- case t if t.isInstanceOf[PBoolean] => BooleanType
- case t if t.isInstanceOf[PVarbinary] || t.isInstanceOf[PBinary] => BinaryType
- case t if t.isInstanceOf[PIntegerArray] || t.isInstanceOf[PUnsignedIntArray] => ArrayType(IntegerType, containsNull = true)
- case t if t.isInstanceOf[PBooleanArray] => ArrayType(BooleanType, containsNull = true)
- case t if t.isInstanceOf[PVarcharArray] || t.isInstanceOf[PCharArray] => ArrayType(StringType, containsNull = true)
- case t if t.isInstanceOf[PVarbinaryArray] || t.isInstanceOf[PBinaryArray] => ArrayType(BinaryType, containsNull = true)
- case t if t.isInstanceOf[PLongArray] || t.isInstanceOf[PUnsignedLongArray] => ArrayType(LongType, containsNull = true)
- case t if t.isInstanceOf[PSmallintArray] || t.isInstanceOf[PUnsignedSmallintArray] => ArrayType(IntegerType, containsNull = true)
- case t if t.isInstanceOf[PTinyintArray] || t.isInstanceOf[PUnsignedTinyintArray] => ArrayType(ByteType, containsNull = true)
- case t if t.isInstanceOf[PFloatArray] || t.isInstanceOf[PUnsignedFloatArray] => ArrayType(FloatType, containsNull = true)
- case t if t.isInstanceOf[PDoubleArray] || t.isInstanceOf[PUnsignedDoubleArray] => ArrayType(DoubleType, containsNull = true)
- case t if t.isInstanceOf[PDecimalArray] => ArrayType(
- if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale), containsNull = true)
- case t if t.isInstanceOf[PTimestampArray] || t.isInstanceOf[PUnsignedTimestampArray] => ArrayType(TimestampType, containsNull = true)
- case t if t.isInstanceOf[PDateArray] || t.isInstanceOf[PUnsignedDateArray] => ArrayType(TimestampType, containsNull = true)
- case t if t.isInstanceOf[PTimeArray] || t.isInstanceOf[PUnsignedTimeArray] => ArrayType(TimestampType, containsNull = true)
- }
}
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
index c35cc54..6d4c4cc 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
@@ -20,7 +20,7 @@
import org.joda.time.DateTime
import scala.collection.{mutable, immutable}
-
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
class PhoenixRecordWritable(columnMetaDataList: List[ColumnInfo]) extends DBWritable {
val upsertValues = mutable.ArrayBuffer[Any]()
val resultMap = mutable.Map[String, AnyRef]()
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
index 38bf29a..2f6ea8c 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
@@ -19,12 +19,11 @@
import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}
-import org.apache.spark.sql.sources._
-import org.apache.phoenix.util.StringUtil.escapeStringConstant
-import org.apache.phoenix.util.SchemaUtil
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Boolean = false)(@transient val sqlContext: SQLContext)
extends BaseRelation with PrunedFilteredScan {
@@ -36,7 +35,7 @@
but this prevents having to load the whole table into Spark first.
*/
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
- val(pushedFilters, unhandledFilters) = buildFilter(filters)
+ val(pushedFilters, _, _) = new FilterExpressionCompiler().pushFilters(filters)
new PhoenixRDD(
sqlContext.sparkContext,
tableName,
@@ -61,71 +60,10 @@
).toDataFrame(sqlContext).schema
}
- // Attempt to create Phoenix-accepted WHERE clauses from Spark filters,
- // mostly inspired from Spark SQL JDBCRDD and the couchbase-spark-connector
- private def buildFilter(filters: Array[Filter]): (String, Array[Filter]) = {
- if (filters.isEmpty) {
- return ("" , Array[Filter]())
- }
-
- val filter = new StringBuilder("")
- val unsupportedFilters = Array[Filter]();
- var i = 0
-
- filters.foreach(f => {
- // Assume conjunction for multiple filters, unless otherwise specified
- if (i > 0) {
- filter.append(" AND")
- }
-
- f match {
- // Spark 1.3.1+ supported filters
- case And(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter, rightFilter)))
- case Or(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter)) + " OR " + buildFilter(Array(rightFilter)))
- case Not(aFilter) => filter.append(" NOT " + buildFilter(Array(aFilter)))
- case EqualTo(attr, value) => filter.append(s" ${escapeKey(attr)} = ${compileValue(value)}")
- case GreaterThan(attr, value) => filter.append(s" ${escapeKey(attr)} > ${compileValue(value)}")
- case GreaterThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} >= ${compileValue(value)}")
- case LessThan(attr, value) => filter.append(s" ${escapeKey(attr)} < ${compileValue(value)}")
- case LessThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} <= ${compileValue(value)}")
- case IsNull(attr) => filter.append(s" ${escapeKey(attr)} IS NULL")
- case IsNotNull(attr) => filter.append(s" ${escapeKey(attr)} IS NOT NULL")
- case In(attr, values) => filter.append(s" ${escapeKey(attr)} IN ${values.map(compileValue).mkString("(", ",", ")")}")
- case StringStartsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue(value + "%")}")
- case StringEndsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value)}")
- case StringContains(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value + "%")}")
- case _ => unsupportedFilters :+ f
- }
-
- i = i + 1
- })
-
- (filter.toString(), unsupportedFilters)
- }
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
- val(pushedFilters, unhandledFilters) = buildFilter(filters)
+ val (_, unhandledFilters, _) = new FilterExpressionCompiler().pushFilters(filters)
unhandledFilters
}
- // Helper function to escape column key to work with SQL queries
- private def escapeKey(key: String): String = SchemaUtil.getEscapedArgument(key)
-
- // Helper function to escape string values in SQL queries
- private def compileValue(value: Any): Any = value match {
- case stringValue: String => s"'${escapeStringConstant(stringValue)}'"
-
- // Borrowed from 'elasticsearch-hadoop', support these internal UTF types across Spark versions
- // Spark 1.4
- case utf if (isClass(utf, "org.apache.spark.sql.types.UTF8String")) => s"'${escapeStringConstant(utf.toString)}'"
- // Spark 1.5
- case utf if (isClass(utf, "org.apache.spark.unsafe.types.UTF8String")) => s"'${escapeStringConstant(utf.toString)}'"
-
- // Pass through anything else
- case _ => value
- }
-
- private def isClass(obj: Any, className: String) = {
- className.equals(obj.getClass().getName())
- }
}
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
index 1b33e6e..b073521 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
@@ -21,6 +21,7 @@
import scala.collection.JavaConversions._
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
class ProductRDDFunctions[A <: Product](data: RDD[A]) extends Serializable {
def saveToPhoenix(tableName: String, cols: Seq[String],
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkContextFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkContextFunctions.scala
index 476ce8a..1b377ab 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkContextFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkContextFunctions.scala
@@ -17,6 +17,7 @@
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
class SparkContextFunctions(@transient val sc: SparkContext) extends Serializable {
/*
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
new file mode 100644
index 0000000..f69e988
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import org.apache.phoenix.query.QueryConstants
+import org.apache.phoenix.schema.types._
+import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
+import org.apache.spark.sql.types._
+
+object SparkSchemaUtil {
+
+ def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo], dateAsTimestamp: Boolean = false) : StructType = {
+ val structFields = columnList.map(ci => {
+ val structType = phoenixTypeToCatalystType(ci, dateAsTimestamp)
+ StructField(normalizeColumnName(ci.getColumnName), structType)
+ })
+ new StructType(structFields.toArray)
+ }
+
+ def normalizeColumnName(columnName: String) = {
+ val unescapedColumnName = SchemaUtil.getUnEscapedFullColumnName(columnName)
+ var normalizedColumnName = ""
+ if (unescapedColumnName.indexOf(QueryConstants.NAME_SEPARATOR) < 0) {
+ normalizedColumnName = unescapedColumnName
+ }
+ else {
+ // split by separator to get the column family and column name
+ val tokens = unescapedColumnName.split(QueryConstants.NAME_SEPARATOR_REGEX)
+ normalizedColumnName = if (tokens(0) == "0") tokens(1) else unescapedColumnName
+ }
+ normalizedColumnName
+ }
+
+
+ // Lookup table for Phoenix types to Spark catalyst types
+ def phoenixTypeToCatalystType(columnInfo: ColumnInfo, dateAsTimestamp: Boolean): DataType = columnInfo.getPDataType match {
+ case t if t.isInstanceOf[PVarchar] || t.isInstanceOf[PChar] => StringType
+ case t if t.isInstanceOf[PLong] || t.isInstanceOf[PUnsignedLong] => LongType
+ case t if t.isInstanceOf[PInteger] || t.isInstanceOf[PUnsignedInt] => IntegerType
+ case t if t.isInstanceOf[PSmallint] || t.isInstanceOf[PUnsignedSmallint] => ShortType
+ case t if t.isInstanceOf[PTinyint] || t.isInstanceOf[PUnsignedTinyint] => ByteType
+ case t if t.isInstanceOf[PFloat] || t.isInstanceOf[PUnsignedFloat] => FloatType
+ case t if t.isInstanceOf[PDouble] || t.isInstanceOf[PUnsignedDouble] => DoubleType
+ // Use Spark system default precision for now (explicit to work with < 1.5)
+ case t if t.isInstanceOf[PDecimal] =>
+ if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale)
+ case t if t.isInstanceOf[PTimestamp] || t.isInstanceOf[PUnsignedTimestamp] => TimestampType
+ case t if t.isInstanceOf[PTime] || t.isInstanceOf[PUnsignedTime] => TimestampType
+ case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && !dateAsTimestamp => DateType
+ case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && dateAsTimestamp => TimestampType
+ case t if t.isInstanceOf[PBoolean] => BooleanType
+ case t if t.isInstanceOf[PVarbinary] || t.isInstanceOf[PBinary] => BinaryType
+ case t if t.isInstanceOf[PIntegerArray] || t.isInstanceOf[PUnsignedIntArray] => ArrayType(IntegerType, containsNull = true)
+ case t if t.isInstanceOf[PBooleanArray] => ArrayType(BooleanType, containsNull = true)
+ case t if t.isInstanceOf[PVarcharArray] || t.isInstanceOf[PCharArray] => ArrayType(StringType, containsNull = true)
+ case t if t.isInstanceOf[PVarbinaryArray] || t.isInstanceOf[PBinaryArray] => ArrayType(BinaryType, containsNull = true)
+ case t if t.isInstanceOf[PLongArray] || t.isInstanceOf[PUnsignedLongArray] => ArrayType(LongType, containsNull = true)
+ case t if t.isInstanceOf[PSmallintArray] || t.isInstanceOf[PUnsignedSmallintArray] => ArrayType(IntegerType, containsNull = true)
+ case t if t.isInstanceOf[PTinyintArray] || t.isInstanceOf[PUnsignedTinyintArray] => ArrayType(ByteType, containsNull = true)
+ case t if t.isInstanceOf[PFloatArray] || t.isInstanceOf[PUnsignedFloatArray] => ArrayType(FloatType, containsNull = true)
+ case t if t.isInstanceOf[PDoubleArray] || t.isInstanceOf[PUnsignedDoubleArray] => ArrayType(DoubleType, containsNull = true)
+ case t if t.isInstanceOf[PDecimalArray] => ArrayType(
+ if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale), containsNull = true)
+ case t if t.isInstanceOf[PTimestampArray] || t.isInstanceOf[PUnsignedTimestampArray] => ArrayType(TimestampType, containsNull = true)
+ case t if t.isInstanceOf[PDateArray] || t.isInstanceOf[PUnsignedDateArray] => ArrayType(TimestampType, containsNull = true)
+ case t if t.isInstanceOf[PTimeArray] || t.isInstanceOf[PUnsignedTimeArray] => ArrayType(TimestampType, containsNull = true)
+ }
+
+}
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSqlContextFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSqlContextFunctions.scala
index a0842c9..f9154ad 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSqlContextFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSqlContextFunctions.scala
@@ -16,6 +16,7 @@
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.{DataFrame, SQLContext}
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
class SparkSqlContextFunctions(@transient val sqlContext: SQLContext) extends Serializable {
/*
diff --git a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
new file mode 100644
index 0000000..712ec2d
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
@@ -0,0 +1,21 @@
+package org.apache.spark.sql.execution.datasources.jdbc
+
+import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType}
+import org.apache.spark.sql.types._
+
+private object PhoenixJdbcDialect extends JdbcDialect {
+
+ override def canHandle(url: String): Boolean = url.startsWith("jdbc:phoenix")
+
+ /**
+ * This is only called for ArrayType (see JdbcUtils.makeSetter)
+ */
+ override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+ case StringType => Some(JdbcType("VARCHAR", java.sql.Types.VARCHAR))
+ case BinaryType => Some(JdbcType("BINARY(" + dt.defaultSize + ")", java.sql.Types.BINARY))
+ case ByteType => Some(JdbcType("TINYINT", java.sql.Types.TINYINT))
+ case _ => None
+ }
+
+
+}
\ No newline at end of file
diff --git a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
new file mode 100644
index 0000000..eac483a
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.sql.{Connection, PreparedStatement, ResultSet}
+import java.util.Locale
+
+import org.apache.spark.executor.InputMetrics
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData}
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils._
+import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.NextIterator
+
+object SparkJdbcUtil {
+
+ def toRow(schema: StructType, internalRow: InternalRow) : Row = {
+ val encoder = RowEncoder(schema).resolveAndBind()
+ encoder.fromRow(internalRow)
+ }
+
+ // A `JDBCValueGetter` is responsible for getting a value from `ResultSet` into a field
+ // for `MutableRow`. The last argument `Int` means the index for the value to be set in
+ // the row and also used for the value in `ResultSet`.
+ private type JDBCValueGetter = (ResultSet, InternalRow, Int) => Unit
+
+ private def nullSafeConvert[T](input: T, f: T => Any): Any = {
+ if (input == null) {
+ null
+ } else {
+ f(input)
+ }
+ }
+
+ /**
+ * Creates `JDBCValueGetter`s according to [[StructType]], which can set
+ * each value from `ResultSet` to each field of [[InternalRow]] correctly.
+ */
+ private def makeGetters(schema: StructType): Array[JDBCValueGetter] =
+ schema.fields.map(sf => makeGetter(sf.dataType, sf.metadata))
+
+ private def makeGetter(dt: DataType, metadata: Metadata): JDBCValueGetter = dt match {
+ case BooleanType =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
+ row.setBoolean(pos, rs.getBoolean(pos + 1))
+
+ case DateType =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
+ // DateTimeUtils.fromJavaDate does not handle null value, so we need to check it.
+ val dateVal = rs.getDate(pos + 1)
+ if (dateVal != null) {
+ row.setInt(pos, DateTimeUtils.fromJavaDate(dateVal))
+ } else {
+ row.update(pos, null)
+ }
+
+ // When connecting with Oracle DB through JDBC, the precision and scale of BigDecimal
+ // object returned by ResultSet.getBigDecimal is not correctly matched to the table
+ // schema reported by ResultSetMetaData.getPrecision and ResultSetMetaData.getScale.
+ // If inserting values like 19999 into a column with NUMBER(12, 2) type, you get through
+ // a BigDecimal object with scale as 0. But the dataframe schema has correct type as
+ // DecimalType(12, 2). Thus, after saving the dataframe into parquet file and then
+ // retrieve it, you will get wrong result 199.99.
+ // So it is needed to set precision and scale for Decimal based on JDBC metadata.
+ case DecimalType.Fixed(p, s) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
+ val decimal =
+ nullSafeConvert[java.math.BigDecimal](rs.getBigDecimal(pos + 1), d => Decimal(d, p, s))
+ row.update(pos, decimal)
+
+ case DoubleType =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
+ row.setDouble(pos, rs.getDouble(pos + 1))
+
+ case FloatType =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
+ row.setFloat(pos, rs.getFloat(pos + 1))
+
+ case IntegerType =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
+ row.setInt(pos, rs.getInt(pos + 1))
+
+ case LongType if metadata.contains("binarylong") =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
+ val bytes = rs.getBytes(pos + 1)
+ var ans = 0L
+ var j = 0
+ while (j < bytes.length) {
+ ans = 256 * ans + (255 & bytes(j))
+ j = j + 1
+ }
+ row.setLong(pos, ans)
+
+ case LongType =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
+ row.setLong(pos, rs.getLong(pos + 1))
+
+ case ShortType =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
+ row.setShort(pos, rs.getShort(pos + 1))
+
+ case StringType =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
+ // TODO(davies): use getBytes for better performance, if the encoding is UTF-8
+ row.update(pos, UTF8String.fromString(rs.getString(pos + 1)))
+
+ case TimestampType =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
+ val t = rs.getTimestamp(pos + 1)
+ if (t != null) {
+ row.setLong(pos, DateTimeUtils.fromJavaTimestamp(t))
+ } else {
+ row.update(pos, null)
+ }
+
+ case BinaryType =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
+ row.update(pos, rs.getBytes(pos + 1))
+
+ case ByteType =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
+ row.update(pos, rs.getByte(pos + 1))
+
+ case ArrayType(et, _) =>
+ val elementConversion = et match {
+ case TimestampType =>
+ (array: Object) =>
+ array.asInstanceOf[Array[java.sql.Timestamp]].map { timestamp =>
+ nullSafeConvert(timestamp, DateTimeUtils.fromJavaTimestamp)
+ }
+
+ case StringType =>
+ (array: Object) =>
+ // some underling types are not String such as uuid, inet, cidr, etc.
+ array.asInstanceOf[Array[java.lang.Object]]
+ .map(obj => if (obj == null) null else UTF8String.fromString(obj.toString))
+
+ case DateType =>
+ (array: Object) =>
+ array.asInstanceOf[Array[java.sql.Date]].map { date =>
+ nullSafeConvert(date, DateTimeUtils.fromJavaDate)
+ }
+
+ case dt: DecimalType =>
+ (array: Object) =>
+ array.asInstanceOf[Array[java.math.BigDecimal]].map { decimal =>
+ nullSafeConvert[java.math.BigDecimal](
+ decimal, d => Decimal(d, dt.precision, dt.scale))
+ }
+
+ case LongType if metadata.contains("binarylong") =>
+ throw new IllegalArgumentException(s"Unsupported array element " +
+ s"type ${dt.catalogString} based on binary")
+
+ case ArrayType(_, _) =>
+ throw new IllegalArgumentException("Nested arrays unsupported")
+
+ case _ => (array: Object) => array.asInstanceOf[Array[Any]]
+ }
+
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
+ val array = nullSafeConvert[java.sql.Array](
+ input = rs.getArray(pos + 1),
+ array => new GenericArrayData(elementConversion.apply(array.getArray)))
+ row.update(pos, array)
+
+ case _ => throw new IllegalArgumentException(s"Unsupported type ${dt.catalogString}")
+ }
+
+ // TODO just use JdbcUtils.resultSetToSparkInternalRows in Spark 3.0 (see SPARK-26499)
+ def resultSetToSparkInternalRows(
+ resultSet: ResultSet,
+ schema: StructType,
+ inputMetrics: InputMetrics): Iterator[InternalRow] = {
+ // JdbcUtils.resultSetToSparkInternalRows(resultSet, schema, inputMetrics)
+ new NextIterator[InternalRow] {
+ private[this] val rs = resultSet
+ private[this] val getters: Array[JDBCValueGetter] = makeGetters(schema)
+ private[this] val mutableRow = new SpecificInternalRow(schema.fields.map(x => x.dataType))
+
+ override protected def close(): Unit = {
+ try {
+ rs.close()
+ } catch {
+ case e: Exception =>
+ }
+ }
+
+ override protected def getNext(): InternalRow = {
+ if (rs.next()) {
+ inputMetrics.incRecordsRead(1)
+ var i = 0
+ while (i < getters.length) {
+ getters(i).apply(rs, mutableRow, i)
+ if (rs.wasNull) mutableRow.setNullAt(i)
+ i = i + 1
+ }
+ mutableRow
+ } else {
+ finished = true
+ null.asInstanceOf[InternalRow]
+ }
+ }
+ }
+ }
+
+ // A `JDBCValueSetter` is responsible for setting a value from `Row` into a field for
+ // `PreparedStatement`. The last argument `Int` means the index for the value to be set
+ // in the SQL statement and also used for the value in `Row`.
+ private type JDBCValueSetter = (PreparedStatement, Row, Int) => Unit
+
+ // take from Spark JdbcUtils.scala, cannot be used directly because the method is private
+ def makeSetter(
+ conn: Connection,
+ dialect: JdbcDialect,
+ dataType: DataType): JDBCValueSetter = dataType match {
+ case IntegerType =>
+ (stmt: PreparedStatement, row: Row, pos: Int) =>
+ stmt.setInt(pos + 1, row.getInt(pos))
+
+ case LongType =>
+ (stmt: PreparedStatement, row: Row, pos: Int) =>
+ stmt.setLong(pos + 1, row.getLong(pos))
+
+ case DoubleType =>
+ (stmt: PreparedStatement, row: Row, pos: Int) =>
+ stmt.setDouble(pos + 1, row.getDouble(pos))
+
+ case FloatType =>
+ (stmt: PreparedStatement, row: Row, pos: Int) =>
+ stmt.setFloat(pos + 1, row.getFloat(pos))
+
+ case ShortType =>
+ (stmt: PreparedStatement, row: Row, pos: Int) =>
+ stmt.setInt(pos + 1, row.getShort(pos))
+
+ case ByteType =>
+ (stmt: PreparedStatement, row: Row, pos: Int) =>
+ stmt.setInt(pos + 1, row.getByte(pos))
+
+ case BooleanType =>
+ (stmt: PreparedStatement, row: Row, pos: Int) =>
+ stmt.setBoolean(pos + 1, row.getBoolean(pos))
+
+ case StringType =>
+ (stmt: PreparedStatement, row: Row, pos: Int) =>
+ stmt.setString(pos + 1, row.getString(pos))
+
+ case BinaryType =>
+ (stmt: PreparedStatement, row: Row, pos: Int) =>
+ stmt.setBytes(pos + 1, row.getAs[Array[Byte]](pos))
+
+ case TimestampType =>
+ (stmt: PreparedStatement, row: Row, pos: Int) =>
+ stmt.setTimestamp(pos + 1, row.getAs[java.sql.Timestamp](pos))
+
+ case DateType =>
+ (stmt: PreparedStatement, row: Row, pos: Int) =>
+ stmt.setDate(pos + 1, row.getAs[java.sql.Date](pos))
+
+ case t: DecimalType =>
+ (stmt: PreparedStatement, row: Row, pos: Int) =>
+ stmt.setBigDecimal(pos + 1, row.getDecimal(pos))
+
+ case ArrayType(et, _) =>
+ // remove type length parameters from end of type name
+ val typeName = getJdbcType(et, dialect).databaseTypeDefinition
+ .toLowerCase(Locale.ROOT).split("\\(")(0)
+ (stmt: PreparedStatement, row: Row, pos: Int) =>
+ val array = conn.createArrayOf(
+ typeName,
+ row.getSeq[AnyRef](pos).toArray)
+ stmt.setArray(pos + 1, array)
+
+ case _ =>
+ (_: PreparedStatement, _: Row, pos: Int) =>
+ throw new IllegalArgumentException(
+ s"Can't translate non-null value for field $pos")
+ }
+
+ // taken from Spark JdbcUtils
+ def getJdbcType(dt: DataType, dialect: JdbcDialect): JdbcType = {
+ dialect.getJDBCType(dt).orElse(getCommonJDBCType(dt)).getOrElse(
+ throw new IllegalArgumentException(s"Can't get JDBC type for ${dt.catalogString}"))
+ }
+
+}