phoenix-spark extends Phoenix's MapReduce support to allow Spark to load Phoenix tables as DataFrames, and enables persisting DataFrames back to Phoenix.
Given a Phoenix table with the following DDL and DML:
CREATE TABLE TABLE1 (ID BIGINT NOT NULL PRIMARY KEY, COL1 VARCHAR); UPSERT INTO TABLE1 (ID, COL1) VALUES (1, 'test_row_1'); UPSERT INTO TABLE1 (ID, COL1) VALUES (2, 'test_row_2');
Scala example:
import org.apache.spark.SparkContext import org.apache.spark.sql.{SQLContext, SparkSession} import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource val spark = SparkSession .builder() .appName("phoenix-test") .master("local") .getOrCreate() // Load data from TABLE1 val df = spark.sqlContext .read .format("phoenix") .options(Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181")) .load df.filter(df("COL1") === "test_row_1" && df("ID") === 1L) .select(df("ID")) .show
Java example:
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL; public class PhoenixSparkRead { public static void main() throws Exception { SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(jsc); // Load data from TABLE1 Dataset<Row> df = sqlContext .read() .format("phoenix") .option("table", "TABLE1") .option(ZOOKEEPER_URL, "phoenix-server:2181") .load(); df.createOrReplaceTempView("TABLE1"); SQLContext sqlCtx = new SQLContext(jsc); df = sqlCtx.sql("SELECT * FROM TABLE1 WHERE COL1='test_row_1' AND ID=1L"); df.show(); jsc.stop(); } }
The save
is method on DataFrame allows passing in a data source type. You can use phoenix
for DataSourceV2 and must also pass in a table
and zkUrl
parameter to specify which table and server to persist the DataFrame to. The column names are derived from the DataFrame's schema field names, and must match the Phoenix column names.
The save
method also takes a SaveMode
option, for which only SaveMode.Overwrite
is supported.
Given two Phoenix tables with the following DDL:
CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER); CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
you can load from an input table and save to an output table as a DataFrame as follows in Scala:
import org.apache.spark.SparkContext import org.apache.spark.sql.{SQLContext, SparkSession, SaveMode} import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource val spark = SparkSession .builder() .appName("phoenix-test") .master("local") .getOrCreate() // Load INPUT_TABLE val df = spark.sqlContext .read .format("phoenix") .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181")) .load // Save to OUTPUT_TABLE df.write .format("phoenix") .mode(SaveMode.Overwrite) .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181")) .save()
Java example:
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SQLContext; import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL; public class PhoenixSparkWriteFromInputTable { public static void main() throws Exception { SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(jsc); // Load INPUT_TABLE Dataset<Row> df = sqlContext .read() .format("phoenix") .option("table", "INPUT_TABLE") .option(ZOOKEEPER_URL, "phoenix-server:2181") .load(); // Save to OUTPUT_TABLE df.write() .format("phoenix") .mode(SaveMode.Overwrite) .option("table", "OUTPUT_TABLE") .option(ZOOKEEPER_URL, "phoenix-server:2181") .save(); jsc.stop(); } }
Just like the previous example, you can pass in the data source type as phoenix
and specify the table
and zkUrl
parameters indicating which table and server to persist the DataFrame to.
Note that the schema of the RDD must match its column data and this must match the schema of the Phoenix table that you save to.
Given an output Phoenix table with the following DDL:
CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
you can save a dataframe from an RDD as follows in Scala:
import org.apache.spark.SparkContext import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, StructField} import org.apache.spark.sql.{Row, SQLContext, SparkSession, SaveMode} import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource val spark = SparkSession .builder() .appName("phoenix-test") .master("local") .getOrCreate() val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3)) val schema = StructType( Seq(StructField("ID", LongType, nullable = false), StructField("COL1", StringType), StructField("COL2", IntegerType))) val rowRDD = spark.sparkContext.parallelize(dataSet) // Apply the schema to the RDD. val df = spark.sqlContext.createDataFrame(rowRDD, schema) df.write .format("phoenix") .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181")) .mode(SaveMode.Overwrite) .save()
Java example:
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.ArrayList; import java.util.List; import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL; public class PhoenixSparkWriteFromRDDWithSchema { public static void main() throws Exception { SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(jsc); SparkSession spark = sqlContext.sparkSession(); Dataset<Row> df; // Generate the schema based on the fields List<StructField> fields = new ArrayList<>(); fields.add(DataTypes.createStructField("ID", DataTypes.LongType, false)); fields.add(DataTypes.createStructField("COL1", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("COL2", DataTypes.IntegerType, true)); StructType schema = DataTypes.createStructType(fields); // Generate the rows with the same exact schema List<Row> rows = new ArrayList<>(); for (int i = 1; i < 4; i++) { rows.add(RowFactory.create(Long.valueOf(i), String.valueOf(i), i)); } // Create a DataFrame from the rows and the specified schema df = spark.createDataFrame(rows, schema); df.write() .format("phoenix") .mode(SaveMode.Overwrite) .option("table", "OUTPUT_TABLE") .option(ZOOKEEPER_URL, "phoenix-server:2181") .save(); jsc.stop(); } }
"org.apache.phoenix.spark"
instead of "phoenix"
, however this is deprecated as of connectors-1.0.0
.phoenixTableAsDataFrame
, phoenixTableAsRDD
and saveToPhoenix
all support optionally specifying a conf
Hadoop configuration parameter with custom Phoenix client settings, as well as an optional zkUrl
parameter for the Phoenix connection URL.zkUrl
isn‘t specified, it’s assumed that the “hbase.zookeeper.quorum” property has been set in the conf
parameter. Similarly, if no configuration is passed in, zkUrl
must be specified.phoenixConfigs
i.e (PhoenixDataSource.PHOENIX_CONFIGS), for ex:df = spark .sqlContext .read .format("phoenix") .options(Map("table" -> "Table1", "zkUrl" -> "phoenix-server:2181", "phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000")) .load;This list of properties is parsed and populated into a properties map which is passed to
DriverManager.getConnection(connString, propsMap)
. Note that the same property values will be used for both the driver and all executors and these configurations are used each time a connection is made (both on the driver and executors).import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.phoenix.spark._ val configuration = new Configuration() // Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum' val sc = new SparkContext("local", "phoenix-test") val sqlContext = new SQLContext(sc) // Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame val df = sqlContext.phoenixTableAsDataFrame( "TABLE1", Array("ID", "COL1"), conf = configuration ) df.show
import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.phoenix.spark._ import org.apache.spark.rdd.RDD val sc = new SparkContext("local", "phoenix-test") // Load the columns 'ID' and 'COL1' from TABLE1 as an RDD val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD( "TABLE1", Seq("ID", "COL1"), zkUrl = Some("phoenix-server:2181") ) rdd.count() val firstId = rdd.first()("ID").asInstanceOf[Long] val firstCol = rdd.first()("COL1").asInstanceOf[String]
saveToPhoenix
is an implicit method on RDD[Product], or an RDD of Tuples. The data types must correspond to the Java types Phoenix supports (http://phoenix.apache.org/language/datatypes.html)
Given a Phoenix table with the following DDL:
CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
import org.apache.spark.SparkContext import org.apache.phoenix.spark._ val sc = new SparkContext("local", "phoenix-test") val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3)) sc .parallelize(dataSet) .saveToPhoenix( "OUTPUT_TEST_TABLE", Seq("ID","COL1","COL2"), zkUrl = Some("phoenix-server:2181") )