PHOENIX-2236 PHOENIX-2290 PHOENIX-2547 Various phoenix-spark fixes (Kalyan Hadoop)
diff --git a/phoenix-spark/src/it/resources/setup.sql b/phoenix-spark/src/it/resources/setup.sql
index aa2cee1..e56924f 100644
--- a/phoenix-spark/src/it/resources/setup.sql
+++ b/phoenix-spark/src/it/resources/setup.sql
@@ -48,3 +48,9 @@
UPSERT INTO TEST_SMALL_TINY VALUES (1, 32767, 127)
CREATE TABLE DATE_TEST(ID BIGINT NOT NULL PRIMARY KEY, COL1 DATE)
UPSERT INTO DATE_TEST VALUES(1, CURRENT_DATE())
+CREATE TABLE "space" ("key" VARCHAR PRIMARY KEY, "first name" VARCHAR)
+UPSERT INTO "space" VALUES ('key1', 'xyz')
+CREATE TABLE "small" ("key" VARCHAR PRIMARY KEY, "first name" VARCHAR, "salary" INTEGER )
+UPSERT INTO "small" VALUES ('key1', 'foo', 10000)
+UPSERT INTO "small" VALUES ('key2', 'bar', 20000)
+UPSERT INTO "small" VALUES ('key3', 'xyz', 30000)
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index c216406..7d05f07 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -621,6 +621,55 @@
assert(Math.abs(epoch - dt) < 86400000)
}
+ test("Filter operation doesn't work for column names containing a white space (PHOENIX-2547)") {
+ val sqlContext = new SQLContext(sc)
+ val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("space"),
+ "zkUrl" -> quorumAddress))
+ val res = df.filter(df.col("first name").equalTo("xyz"))
+ // Make sure we got the right value back
+ assert(res.collectAsList().size() == 1L)
+ }
+
+ test("Spark Phoenix cannot recognize Phoenix view fields (PHOENIX-2290)") {
+ val sqlContext = new SQLContext(sc)
+ val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("small"),
+ "zkUrl" -> quorumAddress))
+ df.registerTempTable("temp")
+
+ // limitation: filter / where expressions are not allowed with "double quotes", instead of that pass it as column expressions
+ // reason: if the expression contains "double quotes" then spark sql parser, ignoring evaluating .. giving to next level to handle
+
+ val res1 = sqlContext.sql("select * from temp where salary = '10000' ")
+ assert(res1.collectAsList().size() == 1L)
+
+ val res2 = sqlContext.sql("select * from temp where \"salary\" = '10000' ")
+ assert(res2.collectAsList().size() == 0L)
+
+ val res3 = sqlContext.sql("select * from temp where salary > '10000' ")
+ assert(res3.collectAsList().size() == 2L)
+ }
+
+ test("Queries with small case column-names return empty result-set when working with Spark Datasource Plugin (PHOENIX-2336)") {
+ val sqlContext = new SQLContext(sc)
+ val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("small"),
+ "zkUrl" -> quorumAddress))
+
+ // limitation: filter / where expressions are not allowed with "double quotes", instead of that pass it as column expressions
+ // reason: if the expression contains "double quotes" then spark sql parser, ignoring evaluating .. giving to next level to handle
+
+ val res1 = df.filter(df.col("first name").equalTo("foo"))
+ assert(res1.collectAsList().size() == 1L)
+
+ val res2 = df.filter("\"first name\" = 'foo'")
+ assert(res2.collectAsList().size() == 0L)
+
+ val res3 = df.filter("salary = '10000'")
+ assert(res3.collectAsList().size() == 1L)
+
+ val res4 = df.filter("salary > '10000'")
+ assert(res4.collectAsList().size() == 2L)
+ }
+
test("Can coerce Phoenix DATE columns to TIMESTAMP through DataFrame API") {
val sqlContext = new SQLContext(sc)
val df = sqlContext.read
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
index 8d7f9f7..d2eac8c 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
@@ -23,6 +23,7 @@
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources._
import org.apache.phoenix.util.StringUtil.escapeStringConstant
+import org.apache.phoenix.util.SchemaUtil
case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Boolean = false)(@transient val sqlContext: SQLContext)
extends BaseRelation with PrunedFilteredScan {
@@ -80,17 +81,17 @@
case And(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter, rightFilter)))
case Or(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter)) + " OR " + buildFilter(Array(rightFilter)))
case Not(aFilter) => filter.append(" NOT " + buildFilter(Array(aFilter)))
- case EqualTo(attr, value) => filter.append(s" $attr = ${compileValue(value)}")
- case GreaterThan(attr, value) => filter.append(s" $attr > ${compileValue(value)}")
- case GreaterThanOrEqual(attr, value) => filter.append(s" $attr >= ${compileValue(value)}")
- case LessThan(attr, value) => filter.append(s" $attr < ${compileValue(value)}")
- case LessThanOrEqual(attr, value) => filter.append(s" $attr <= ${compileValue(value)}")
- case IsNull(attr) => filter.append(s" $attr IS NULL")
- case IsNotNull(attr) => filter.append(s" $attr IS NOT NULL")
- case In(attr, values) => filter.append(s" $attr IN ${values.map(compileValue).mkString("(", ",", ")")}")
- case StringStartsWith(attr, value) => filter.append(s" $attr LIKE ${compileValue(value + "%")}")
- case StringEndsWith(attr, value) => filter.append(s" $attr LIKE ${compileValue("%" + value)}")
- case StringContains(attr, value) => filter.append(s" $attr LIKE ${compileValue("%" + value + "%")}")
+ case EqualTo(attr, value) => filter.append(s" ${escapeKey(attr)} = ${compileValue(value)}")
+ case GreaterThan(attr, value) => filter.append(s" ${escapeKey(attr)} > ${compileValue(value)}")
+ case GreaterThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} >= ${compileValue(value)}")
+ case LessThan(attr, value) => filter.append(s" ${escapeKey(attr)} < ${compileValue(value)}")
+ case LessThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} <= ${compileValue(value)}")
+ case IsNull(attr) => filter.append(s" ${escapeKey(attr)} IS NULL")
+ case IsNotNull(attr) => filter.append(s" ${escapeKey(attr)} IS NOT NULL")
+ case In(attr, values) => filter.append(s" ${escapeKey(attr)} IN ${values.map(compileValue).mkString("(", ",", ")")}")
+ case StringStartsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue(value + "%")}")
+ case StringEndsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value)}")
+ case StringContains(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value + "%")}")
}
i = i + 1
@@ -99,6 +100,9 @@
filter.toString()
}
+ // Helper function to escape column key to work with SQL queries
+ private def escapeKey(key: String): String = SchemaUtil.getEscapedArgument(key)
+
// Helper function to escape string values in SQL queries
private def compileValue(value: Any): Any = value match {
case stringValue: String => s"'${escapeStringConstant(stringValue)}'"