PHOENIX-6683 Surround the OR filters with parentheses while convertin… (#73)

diff --git a/phoenix-spark-base/src/it/java/org/apache/phoenix/spark/OrderByIT.java b/phoenix-spark-base/src/it/java/org/apache/phoenix/spark/OrderByIT.java
index 664bc94..043bb32 100644
--- a/phoenix-spark-base/src/it/java/org/apache/phoenix/spark/OrderByIT.java
+++ b/phoenix-spark-base/src/it/java/org/apache/phoenix/spark/OrderByIT.java
@@ -29,6 +29,7 @@
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -283,6 +284,72 @@
     }
 
     @Test
+    public void testCombinationOfOrAndFilters() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(false);
+            String tableName1 = generateUniqueName();
+            String ddl = "CREATE TABLE  " + tableName1 +
+                    "  (ENTITY_INSTANCE_ID BIGINT NOT NULL, MEMBER_ID VARCHAR(50) NOT NULL, CASE_ID VARCHAR(250) NOT NULL," +
+                    " CANCELLATION_FLAG VARCHAR(1), CASE_MATCH_TYPE CHAR(1), CONSTRAINT PK1 PRIMARY KEY(ENTITY_INSTANCE_ID, " +
+                    "MEMBER_ID, CASE_ID))\n";
+            createTestTable(getUrl(), ddl);
+            SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
+            Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+                    .option(DataSourceOptions.TABLE_KEY, tableName1)
+                    .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+            phoenixDataSet.createOrReplaceTempView(tableName1);
+            String dml = "UPSERT INTO " + tableName1 + " VALUES(?,?,?,?,?)";
+            PreparedStatement stmt = conn.prepareStatement(dml);
+            stmt.setInt(1, 40);
+            stmt.setString(2, "a");
+            stmt.setString(3, "b");
+            stmt.setString(4, "Y");
+            stmt.setString(5, "M");
+            stmt.execute();
+            stmt.setInt(1, 40);
+            stmt.setString(2, "c");
+            stmt.setString(3, "d");
+            stmt.setNull(4, Types.VARCHAR);
+            stmt.setString(5, "M");
+            stmt.execute();
+            stmt.setInt(1, 40);
+            stmt.setString(2, "e");
+            stmt.setString(3, "f");
+            stmt.setString(4, "N");
+            stmt.setString(5, "M");
+            stmt.execute();
+            stmt.setInt(1, 41);
+            stmt.setString(2, "f");
+            stmt.setString(3, "g");
+            stmt.setString(4, "N");
+            stmt.setString(5, "C");
+            stmt.execute();
+            conn.commit();
+            String query =
+                    "select count(*) from " + tableName1 + " where ENTITY_INSTANCE_ID = 40 and  " +
+                            "(CANCELLATION_FLAG <> 'Y' OR CANCELLATION_FLAG IS NULL ) and CASE_MATCH_TYPE='M'";
+            Dataset<Row> dataset =
+                    sqlContext.sql(query);
+            List<Row> rows = dataset.collectAsList();
+            ResultSet rs = new SparkResultSet(rows, dataset.columns());
+            assertTrue(rs.next());
+            assertEquals(2, rs.getLong(1));
+            assertFalse(rs.next());
+            query =
+                    "select count(*) from " + tableName1 + " where ENTITY_INSTANCE_ID = 40 and CASE_MATCH_TYPE='M' " +
+                            " OR CANCELLATION_FLAG <> 'Y'";
+            dataset =
+                    sqlContext.sql(query);
+            rows = dataset.collectAsList();
+            rs = new SparkResultSet(rows, dataset.columns());
+            assertTrue(rs.next());
+            assertEquals(4, rs.getLong(1));
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
     public void testOrderByWithExpression() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(getUrl(), props);
diff --git a/phoenix-spark-base/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala b/phoenix-spark-base/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
index a2ec2dc..111f021 100644
--- a/phoenix-spark-base/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
+++ b/phoenix-spark-base/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
@@ -66,7 +66,7 @@
           val(whereLeftClause, leftUnsupportedFilters, _) = pushFilters(Array(leftFilter))
           val(whereRightClause, rightUnsupportedFilters, _) = pushFilters(Array(rightFilter))
           if (leftUnsupportedFilters.isEmpty && rightUnsupportedFilters.isEmpty) {
-            filter.append(whereLeftClause + " OR " + whereRightClause)
+            filter.append("(" + whereLeftClause + " OR " + whereRightClause + ")")
           }
           else {
             unsupportedFilters :+ f
diff --git a/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/OrderByIT.java b/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/OrderByIT.java
index 0b51436..e7aa424 100644
--- a/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/OrderByIT.java
+++ b/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/OrderByIT.java
@@ -29,6 +29,7 @@
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -284,6 +285,72 @@
     }
 
     @Test
+    public void testCombinationOfOrAndFilters() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(false);
+            String tableName1 = generateUniqueName();
+            String ddl = "CREATE TABLE  " + tableName1 +
+                    "  (ENTITY_INSTANCE_ID BIGINT NOT NULL, MEMBER_ID VARCHAR(50) NOT NULL, CASE_ID VARCHAR(250) NOT NULL," +
+                    " CANCELLATION_FLAG VARCHAR(1), CASE_MATCH_TYPE CHAR(1), CONSTRAINT PK1 PRIMARY KEY(ENTITY_INSTANCE_ID, " +
+                    "MEMBER_ID, CASE_ID))\n";
+            createTestTable(getUrl(), ddl);
+            SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
+            Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+                    .option("table", tableName1)
+                    .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+            phoenixDataSet.createOrReplaceTempView(tableName1);
+            String dml = "UPSERT INTO " + tableName1 + " VALUES(?,?,?,?,?)";
+            PreparedStatement stmt = conn.prepareStatement(dml);
+            stmt.setInt(1, 40);
+            stmt.setString(2, "a");
+            stmt.setString(3, "b");
+            stmt.setString(4, "Y");
+            stmt.setString(5, "M");
+            stmt.execute();
+            stmt.setInt(1, 40);
+            stmt.setString(2, "c");
+            stmt.setString(3, "d");
+            stmt.setNull(4, Types.VARCHAR);
+            stmt.setString(5, "M");
+            stmt.execute();
+            stmt.setInt(1, 40);
+            stmt.setString(2, "e");
+            stmt.setString(3, "f");
+            stmt.setString(4, "N");
+            stmt.setString(5, "M");
+            stmt.execute();
+            stmt.setInt(1, 41);
+            stmt.setString(2, "f");
+            stmt.setString(3, "g");
+            stmt.setString(4, "N");
+            stmt.setString(5, "C");
+            stmt.execute();
+            conn.commit();
+            String query =
+                    "select count(*) from " + tableName1 + " where ENTITY_INSTANCE_ID = 40 and  " +
+                            "(CANCELLATION_FLAG <> 'Y' OR CANCELLATION_FLAG IS NULL ) and CASE_MATCH_TYPE='M'";
+            Dataset<Row> dataset =
+                    sqlContext.sql(query);
+            List<Row> rows = dataset.collectAsList();
+            ResultSet rs = new SparkResultSet(rows, dataset.columns());
+            assertTrue(rs.next());
+            assertEquals(2, rs.getLong(1));
+            assertFalse(rs.next());
+            query =
+                    "select count(*) from " + tableName1 + " where ENTITY_INSTANCE_ID = 40 and CASE_MATCH_TYPE='M' " +
+                            " OR CANCELLATION_FLAG <> 'Y'";
+            dataset =
+                    sqlContext.sql(query);
+            rows = dataset.collectAsList();
+            rs = new SparkResultSet(rows, dataset.columns());
+            assertTrue(rs.next());
+            assertEquals(4, rs.getLong(1));
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
     public void testOrderByWithExpression() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(getUrl(), props);
diff --git a/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala b/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
index 8b9b481..77c172a 100644
--- a/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
+++ b/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
@@ -66,7 +66,7 @@
           val(whereLeftClause, leftUnsupportedFilters, _) = pushFilters(Array(leftFilter))
           val(whereRightClause, rightUnsupportedFilters, _) = pushFilters(Array(rightFilter))
           if (leftUnsupportedFilters.isEmpty && rightUnsupportedFilters.isEmpty) {
-            filter.append(whereLeftClause + " OR " + whereRightClause)
+            filter.append("(" + whereLeftClause + " OR " + whereRightClause + ")")
           }
           else {
             unsupportedFilters :+ f