PHOENIX-2236 PHOENIX-2290 PHOENIX-2547 Various phoenix-spark fixes (Kalyan Hadoop)
diff --git a/phoenix-spark/src/it/resources/setup.sql b/phoenix-spark/src/it/resources/setup.sql
index aa2cee1..e56924f 100644
--- a/phoenix-spark/src/it/resources/setup.sql
+++ b/phoenix-spark/src/it/resources/setup.sql
@@ -48,3 +48,9 @@
 UPSERT INTO TEST_SMALL_TINY VALUES (1, 32767, 127)
 CREATE TABLE DATE_TEST(ID BIGINT NOT NULL PRIMARY KEY, COL1 DATE)
 UPSERT INTO DATE_TEST VALUES(1, CURRENT_DATE())
+CREATE TABLE "space" ("key" VARCHAR PRIMARY KEY, "first name" VARCHAR)
+UPSERT INTO "space" VALUES ('key1', 'xyz')
+CREATE TABLE "small" ("key" VARCHAR PRIMARY KEY, "first name" VARCHAR, "salary" INTEGER )
+UPSERT INTO "small" VALUES ('key1', 'foo', 10000)
+UPSERT INTO "small" VALUES ('key2', 'bar', 20000)
+UPSERT INTO "small" VALUES ('key3', 'xyz', 30000)
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index c216406..7d05f07 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -621,6 +621,55 @@
     assert(Math.abs(epoch - dt) < 86400000)
   }
 
+  test("Filter operation doesn't work for column names containing a white space (PHOENIX-2547)") {
+    val sqlContext = new SQLContext(sc)
+    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("space"),
+      "zkUrl" -> quorumAddress))
+    val res = df.filter(df.col("first name").equalTo("xyz"))
+    // Make sure we got the right value back
+    assert(res.collectAsList().size() == 1L)
+  }
+
+  test("Spark Phoenix cannot recognize Phoenix view fields (PHOENIX-2290)") {
+    val sqlContext = new SQLContext(sc)
+    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("small"),
+      "zkUrl" -> quorumAddress))
+    df.registerTempTable("temp")
+
+    // limitation: filter / where expressions are not allowed with "double quotes", instead of that pass it as column expressions
+    // reason: if the expression contains "double quotes" then spark sql parser, ignoring evaluating .. giving to next level to handle
+
+    val res1 = sqlContext.sql("select * from temp where salary = '10000' ")
+    assert(res1.collectAsList().size() == 1L)
+
+    val res2 = sqlContext.sql("select * from temp where \"salary\" = '10000' ")
+    assert(res2.collectAsList().size() == 0L)
+
+    val res3 = sqlContext.sql("select * from temp where salary > '10000' ")
+    assert(res3.collectAsList().size() == 2L)
+  }
+
+  test("Queries with small case column-names return empty result-set when working with Spark Datasource Plugin (PHOENIX-2336)") {
+    val sqlContext = new SQLContext(sc)
+    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("small"),
+      "zkUrl" -> quorumAddress))
+
+    // limitation: filter / where expressions are not allowed with "double quotes", instead of that pass it as column expressions
+    // reason: if the expression contains "double quotes" then spark sql parser, ignoring evaluating .. giving to next level to handle
+
+    val res1 = df.filter(df.col("first name").equalTo("foo"))
+    assert(res1.collectAsList().size() == 1L)
+
+    val res2 = df.filter("\"first name\" = 'foo'")
+    assert(res2.collectAsList().size() == 0L)
+
+    val res3 = df.filter("salary = '10000'")
+    assert(res3.collectAsList().size() == 1L)
+
+    val res4 = df.filter("salary > '10000'")
+    assert(res4.collectAsList().size() == 2L)
+  }
+
   test("Can coerce Phoenix DATE columns to TIMESTAMP through DataFrame API") {
     val sqlContext = new SQLContext(sc)
     val df = sqlContext.read
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
index 8d7f9f7..d2eac8c 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
@@ -23,6 +23,7 @@
 import org.apache.spark.sql.{Row, SQLContext}
 import org.apache.spark.sql.sources._
 import org.apache.phoenix.util.StringUtil.escapeStringConstant
+import org.apache.phoenix.util.SchemaUtil
 
 case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Boolean = false)(@transient val sqlContext: SQLContext)
     extends BaseRelation with PrunedFilteredScan {
@@ -80,17 +81,17 @@
         case And(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter, rightFilter)))
         case Or(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter)) + " OR " + buildFilter(Array(rightFilter)))
         case Not(aFilter) => filter.append(" NOT " + buildFilter(Array(aFilter)))
-        case EqualTo(attr, value) => filter.append(s" $attr = ${compileValue(value)}")
-        case GreaterThan(attr, value) => filter.append(s" $attr > ${compileValue(value)}")
-        case GreaterThanOrEqual(attr, value) => filter.append(s" $attr >= ${compileValue(value)}")
-        case LessThan(attr, value) => filter.append(s" $attr < ${compileValue(value)}")
-        case LessThanOrEqual(attr, value) => filter.append(s" $attr <= ${compileValue(value)}")
-        case IsNull(attr) => filter.append(s" $attr IS NULL")
-        case IsNotNull(attr) => filter.append(s" $attr IS NOT NULL")
-        case In(attr, values) => filter.append(s" $attr IN ${values.map(compileValue).mkString("(", ",", ")")}")
-        case StringStartsWith(attr, value) => filter.append(s" $attr LIKE ${compileValue(value + "%")}")
-        case StringEndsWith(attr, value) => filter.append(s" $attr LIKE ${compileValue("%" + value)}")
-        case StringContains(attr, value) => filter.append(s" $attr LIKE ${compileValue("%" + value + "%")}")
+        case EqualTo(attr, value) => filter.append(s" ${escapeKey(attr)} = ${compileValue(value)}")
+        case GreaterThan(attr, value) => filter.append(s" ${escapeKey(attr)} > ${compileValue(value)}")
+        case GreaterThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} >= ${compileValue(value)}")
+        case LessThan(attr, value) => filter.append(s" ${escapeKey(attr)} < ${compileValue(value)}")
+        case LessThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} <= ${compileValue(value)}")
+        case IsNull(attr) => filter.append(s" ${escapeKey(attr)} IS NULL")
+        case IsNotNull(attr) => filter.append(s" ${escapeKey(attr)} IS NOT NULL")
+        case In(attr, values) => filter.append(s" ${escapeKey(attr)} IN ${values.map(compileValue).mkString("(", ",", ")")}")
+        case StringStartsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue(value + "%")}")
+        case StringEndsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value)}")
+        case StringContains(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value + "%")}")
       }
 
       i = i + 1
@@ -99,6 +100,9 @@
     filter.toString()
   }
 
+  // Helper function to escape column key to work with SQL queries
+  private def escapeKey(key: String): String = SchemaUtil.getEscapedArgument(key)
+
   // Helper function to escape string values in SQL queries
   private def compileValue(value: Any): Any = value match {
     case stringValue: String => s"'${escapeStringConstant(stringValue)}'"