blob: 288946486d2ed1c50cd18ea291956ad7be2c5301 [file] [log] [blame]
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.apache.phoenix.spark
import java.sql.{Connection, DriverManager}
import java.util.Date
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, HBaseTestingUtility}
import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT
import org.apache.phoenix.query.BaseTest
import org.apache.phoenix.schema.{TableNotFoundException, ColumnNotFoundException}
import org.apache.phoenix.schema.types.PVarchar
import org.apache.phoenix.util.{SchemaUtil, ColumnInfo}
import org.apache.spark.sql.{SaveMode, execution, SQLContext}
import org.apache.spark.sql.types.{LongType, DataType, StringType, StructField}
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.DateTime
import org.scalatest._
import org.apache.phoenix.spark._
import scala.collection.mutable.ListBuffer
/*
Note: If running directly from an IDE, these are the recommended VM parameters:
-Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
*/
// Helper object to access the protected abstract static methods hidden in BaseHBaseManagedTimeIT
object PhoenixSparkITHelper extends BaseHBaseManagedTimeIT {
def getTestClusterConfig = BaseHBaseManagedTimeIT.getTestClusterConfig
def doSetup = BaseHBaseManagedTimeIT.doSetup()
def doTeardown = BaseHBaseManagedTimeIT.doTeardown()
def getUrl = BaseTest.getUrl
}
class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
var conn: Connection = _
var sc: SparkContext = _
lazy val hbaseConfiguration = {
val conf = PhoenixSparkITHelper.getTestClusterConfig
// The zookeeper quorum address defaults to "localhost" which is incorrect, let's fix it
val quorum = conf.get("hbase.zookeeper.quorum")
val clientPort = conf.get("hbase.zookeeper.property.clientPort")
val znodeParent = conf.get("zookeeper.znode.parent")
conf.set(HConstants.ZOOKEEPER_QUORUM, s"$quorum:$clientPort:$znodeParent")
conf
}
lazy val quorumAddress = {
hbaseConfiguration.get(HConstants.ZOOKEEPER_QUORUM)
}
override def beforeAll() {
PhoenixSparkITHelper.doSetup
conn = DriverManager.getConnection(PhoenixSparkITHelper.getUrl)
conn.setAutoCommit(true)
// each SQL statement used to set up Phoenix must be on a single line. Yes, that
// can potentially make large lines.
val setupSqlSource = getClass.getClassLoader.getResourceAsStream("setup.sql")
val setupSql = scala.io.Source.fromInputStream(setupSqlSource).getLines()
.filter(line => ! line.startsWith("--") && ! line.isEmpty)
for (sql <- setupSql) {
val stmt = conn.createStatement()
stmt.execute(sql)
}
conn.commit()
val conf = new SparkConf()
.setAppName("PhoenixSparkIT")
.setMaster("local[2]") // 2 threads, some parallelism
.set("spark.ui.showConsoleProgress", "false") // Disable printing stage progress
sc = new SparkContext(conf)
}
override def afterAll() {
conn.close()
sc.stop()
PhoenixSparkITHelper.doTeardown
}
test("Can convert Phoenix schema") {
val phoenixSchema = List(
new ColumnInfo("varcharColumn", PVarchar.INSTANCE.getSqlType)
)
val rdd = new PhoenixRDD(sc, "MyTable", Array("Foo", "Bar"),
conf = hbaseConfiguration)
val catalystSchema = rdd.phoenixSchemaToCatalystSchema(phoenixSchema)
val expected = List(StructField("varcharColumn", StringType, nullable = true))
catalystSchema shouldEqual expected
}
test("Can create schema RDD and execute query") {
val sqlContext = new SQLContext(sc)
val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = hbaseConfiguration)
df1.registerTempTable("sql_table_1")
val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"),
conf = hbaseConfiguration)
df2.registerTempTable("sql_table_2")
val sqlRdd = sqlContext.sql("""
|SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1
|INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin
)
val count = sqlRdd.count()
count shouldEqual 6L
}
test("Can create schema RDD and execute query on case sensitive table (no config)") {
val sqlContext = new SQLContext(sc)
val df1 = sqlContext.phoenixTableAsDataFrame(
SchemaUtil.getEscapedArgument("table3"),
Array("id", "col1"),
zkUrl = Some(quorumAddress))
df1.registerTempTable("table3")
val sqlRdd = sqlContext.sql("SELECT * FROM table3")
val count = sqlRdd.count()
count shouldEqual 2L
}
test("Can create schema RDD and execute constrained query") {
val sqlContext = new SQLContext(sc)
val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"),
conf = hbaseConfiguration)
df1.registerTempTable("sql_table_1")
val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"),
predicate = Some("\"ID\" = 1"),
conf = hbaseConfiguration)
df2.registerTempTable("sql_table_2")
val sqlRdd = sqlContext.sql("""
|SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1
|INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin
)
val count = sqlRdd.count()
count shouldEqual 1L
}
test("Using a predicate referring to a non-existent column should fail") {
intercept[Exception] {
val sqlContext = new SQLContext(sc)
val df1 = sqlContext.phoenixTableAsDataFrame(
SchemaUtil.getEscapedArgument("table3"),
Array("id", "col1"),
predicate = Some("foo = bar"),
conf = hbaseConfiguration)
df1.registerTempTable("table3")
val sqlRdd = sqlContext.sql("SELECT * FROM table3")
// we have to execute an action before the predicate failure can occur
val count = sqlRdd.count()
}.getCause shouldBe a[ColumnNotFoundException]
}
test("Can create schema RDD with predicate that will never match") {
val sqlContext = new SQLContext(sc)
val df1 = sqlContext.phoenixTableAsDataFrame(
SchemaUtil.getEscapedArgument("table3"),
Array("id", "col1"),
predicate = Some("\"id\" = -1"),
conf = hbaseConfiguration)
df1.registerTempTable("table3")
val sqlRdd = sqlContext.sql("SELECT * FROM table3")
val count = sqlRdd.count()
count shouldEqual 0L
}
test("Can create schema RDD with complex predicate") {
val sqlContext = new SQLContext(sc)
val df1 = sqlContext.phoenixTableAsDataFrame(
"DATE_PREDICATE_TEST_TABLE",
Array("ID", "TIMESERIES_KEY"),
predicate = Some("""
|ID > 0 AND TIMESERIES_KEY BETWEEN
|CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND
|CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)""".stripMargin),
conf = hbaseConfiguration)
df1.registerTempTable("date_predicate_test_table")
val sqlRdd = df1.sqlContext.sql("SELECT * FROM date_predicate_test_table")
val count = sqlRdd.count()
count shouldEqual 0L
}
test("Can query an array table") {
val sqlContext = new SQLContext(sc)
val df1 = sqlContext.phoenixTableAsDataFrame("ARRAY_TEST_TABLE", Array("ID", "VCARRAY"),
conf = hbaseConfiguration)
df1.registerTempTable("ARRAY_TEST_TABLE")
val sqlRdd = sqlContext.sql("SELECT * FROM ARRAY_TEST_TABLE")
val count = sqlRdd.count()
// get row 0, column 1, which should be "VCARRAY"
val arrayValues = sqlRdd.collect().apply(0).apply(1)
arrayValues should equal(Array("String1", "String2", "String3"))
count shouldEqual 1L
}
test("Can read a table as an RDD") {
val rdd1 = sc.phoenixTableAsRDD("ARRAY_TEST_TABLE", Seq("ID", "VCARRAY"),
conf = hbaseConfiguration)
val count = rdd1.count()
val arrayValues = rdd1.take(1)(0)("VCARRAY")
arrayValues should equal(Array("String1", "String2", "String3"))
count shouldEqual 1L
}
test("Can save to phoenix table") {
val sqlContext = new SQLContext(sc)
val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))
sc
.parallelize(dataSet)
.saveToPhoenix(
"OUTPUT_TEST_TABLE",
Seq("ID", "COL1", "COL2"),
hbaseConfiguration
)
// Load the results back
val stmt = conn.createStatement()
val rs = stmt.executeQuery("SELECT ID, COL1, COL2 FROM OUTPUT_TEST_TABLE")
val results = ListBuffer[(Long, String, Int)]()
while (rs.next()) {
results.append((rs.getLong(1), rs.getString(2), rs.getInt(3)))
}
// Verify they match
(0 to results.size - 1).foreach { i =>
dataSet(i) shouldEqual results(i)
}
}
test("Can save Java and Joda dates to Phoenix (no config)") {
val dt = new DateTime()
val date = new Date()
val dataSet = List((1L, "1", 1, dt), (2L, "2", 2, date))
sc
.parallelize(dataSet)
.saveToPhoenix(
"OUTPUT_TEST_TABLE",
Seq("ID","COL1","COL2","COL3"),
zkUrl = Some(quorumAddress)
)
// Load the results back
val stmt = conn.createStatement()
val rs = stmt.executeQuery("SELECT COL3 FROM OUTPUT_TEST_TABLE WHERE ID = 1 OR ID = 2 ORDER BY ID ASC")
val results = ListBuffer[java.sql.Date]()
while (rs.next()) {
results.append(rs.getDate(1))
}
// Verify the epochs are equal
results(0).getTime shouldEqual dt.getMillis
results(1).getTime shouldEqual date.getTime
}
test("Can infer schema without defining columns") {
val sqlContext = new SQLContext(sc)
val df = sqlContext.phoenixTableAsDataFrame("TABLE2", Seq(), conf = hbaseConfiguration)
df.schema("ID").dataType shouldEqual LongType
df.schema("TABLE1_ID").dataType shouldEqual LongType
df.schema("t2col1").dataType shouldEqual StringType
}
test("Spark SQL can use Phoenix as a data source with no schema specified") {
val sqlContext = new SQLContext(sc)
val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
"zkUrl" -> quorumAddress))
df.count() shouldEqual 2
df.schema("ID").dataType shouldEqual LongType
df.schema("COL1").dataType shouldEqual StringType
}
test("Spark SQL can use Phoenix as a data source with PrunedFilteredScan") {
val sqlContext = new SQLContext(sc)
val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
"zkUrl" -> quorumAddress))
val res = df.filter(df("COL1") === "test_row_1" && df("ID") === 1L).select(df("ID"))
// Make sure we got the right value back
assert(res.first().getLong(0) == 1L)
/*
NOTE: There doesn't appear to be any way of verifying from the Spark query planner that
filtering is being pushed down and done server-side. However, since PhoenixRelation
implements PrunedFilteredScan, debugging has shown that both the SELECT columns and WHERE
predicates are being passed along to us, which we then forward it to Phoenix.
TODO: investigate further to find a way to verify server-side pushdown
*/
}
test("Can persist a dataframe using 'DataFrame.saveToPhoenix'") {
// Load from TABLE1
val sqlContext = new SQLContext(sc)
val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
"zkUrl" -> quorumAddress))
// Save to TABLE1_COPY
df.saveToPhoenix("TABLE1_COPY", zkUrl = Some(quorumAddress))
// Verify results
val stmt = conn.createStatement()
val rs = stmt.executeQuery("SELECT * FROM TABLE1_COPY")
val checkResults = List((1L, "test_row_1"), (2, "test_row_2"))
val results = ListBuffer[(Long, String)]()
while (rs.next()) {
results.append((rs.getLong(1), rs.getString(2)))
}
stmt.close()
results.toList shouldEqual checkResults
}
test("Can persist a dataframe using 'DataFrame.save()") {
// Clear TABLE1_COPY
var stmt = conn.createStatement()
stmt.executeUpdate("DELETE FROM TABLE1_COPY")
stmt.close()
// Load TABLE1, save as TABLE1_COPY
val sqlContext = new SQLContext(sc)
val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
"zkUrl" -> quorumAddress))
// Save to TABLE21_COPY
df.save("org.apache.phoenix.spark", SaveMode.Overwrite, Map("table" -> "TABLE1_COPY", "zkUrl" -> quorumAddress))
// Verify results
stmt = conn.createStatement()
val rs = stmt.executeQuery("SELECT * FROM TABLE1_COPY")
val checkResults = List((1L, "test_row_1"), (2, "test_row_2"))
val results = ListBuffer[(Long, String)]()
while (rs.next()) {
results.append((rs.getLong(1), rs.getString(2)))
}
stmt.close()
results.toList shouldEqual checkResults
}
test("Can save arrays back to phoenix") {
val dataSet = List((2L, Array("String1", "String2", "String3")))
sc
.parallelize(dataSet)
.saveToPhoenix(
"ARRAY_TEST_TABLE",
Seq("ID","VCARRAY"),
zkUrl = Some(quorumAddress)
)
// Load the results back
val stmt = conn.createStatement()
val rs = stmt.executeQuery("SELECT VCARRAY FROM ARRAY_TEST_TABLE WHERE ID = 2")
rs.next()
val sqlArray = rs.getArray(1).getArray().asInstanceOf[Array[String]]
// Verify the arrays are equal
sqlArray shouldEqual dataSet(0)._2
}
test("Can read from table with schema and escaped table name") {
// Manually escape
val rdd1 = sc.phoenixTableAsRDD(
"CUSTOM_ENTITY.\"z02\"",
Seq("ID"),
conf = hbaseConfiguration)
var count = rdd1.count()
count shouldEqual 1L
// Use SchemaUtil
val rdd2 = sc.phoenixTableAsRDD(
SchemaUtil.getEscapedFullTableName("CUSTOM_ENTITY.z02"),
Seq("ID"),
conf = hbaseConfiguration)
count = rdd2.count()
count shouldEqual 1L
}
}