PHOENIX-6683 Surround the OR filters with parentheses while convertin… (#73)
diff --git a/phoenix-spark-base/src/it/java/org/apache/phoenix/spark/OrderByIT.java b/phoenix-spark-base/src/it/java/org/apache/phoenix/spark/OrderByIT.java
index 664bc94..043bb32 100644
--- a/phoenix-spark-base/src/it/java/org/apache/phoenix/spark/OrderByIT.java
+++ b/phoenix-spark-base/src/it/java/org/apache/phoenix/spark/OrderByIT.java
@@ -29,6 +29,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -283,6 +284,72 @@
}
@Test
+ public void testCombinationOfOrAndFilters() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(false);
+ String tableName1 = generateUniqueName();
+ String ddl = "CREATE TABLE " + tableName1 +
+ " (ENTITY_INSTANCE_ID BIGINT NOT NULL, MEMBER_ID VARCHAR(50) NOT NULL, CASE_ID VARCHAR(250) NOT NULL," +
+ " CANCELLATION_FLAG VARCHAR(1), CASE_MATCH_TYPE CHAR(1), CONSTRAINT PK1 PRIMARY KEY(ENTITY_INSTANCE_ID, " +
+ "MEMBER_ID, CASE_ID))\n";
+ createTestTable(getUrl(), ddl);
+ SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
+ Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+ .option(DataSourceOptions.TABLE_KEY, tableName1)
+ .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+ phoenixDataSet.createOrReplaceTempView(tableName1);
+ String dml = "UPSERT INTO " + tableName1 + " VALUES(?,?,?,?,?)";
+ PreparedStatement stmt = conn.prepareStatement(dml);
+ stmt.setInt(1, 40);
+ stmt.setString(2, "a");
+ stmt.setString(3, "b");
+ stmt.setString(4, "Y");
+ stmt.setString(5, "M");
+ stmt.execute();
+ stmt.setInt(1, 40);
+ stmt.setString(2, "c");
+ stmt.setString(3, "d");
+ stmt.setNull(4, Types.VARCHAR);
+ stmt.setString(5, "M");
+ stmt.execute();
+ stmt.setInt(1, 40);
+ stmt.setString(2, "e");
+ stmt.setString(3, "f");
+ stmt.setString(4, "N");
+ stmt.setString(5, "M");
+ stmt.execute();
+ stmt.setInt(1, 41);
+ stmt.setString(2, "f");
+ stmt.setString(3, "g");
+ stmt.setString(4, "N");
+ stmt.setString(5, "C");
+ stmt.execute();
+ conn.commit();
+ String query =
+ "select count(*) from " + tableName1 + " where ENTITY_INSTANCE_ID = 40 and " +
+ "(CANCELLATION_FLAG <> 'Y' OR CANCELLATION_FLAG IS NULL ) and CASE_MATCH_TYPE='M'";
+ Dataset<Row> dataset =
+ sqlContext.sql(query);
+ List<Row> rows = dataset.collectAsList();
+ ResultSet rs = new SparkResultSet(rows, dataset.columns());
+ assertTrue(rs.next());
+ assertEquals(2, rs.getLong(1));
+ assertFalse(rs.next());
+ query =
+ "select count(*) from " + tableName1 + " where ENTITY_INSTANCE_ID = 40 and CASE_MATCH_TYPE='M' " +
+ " OR CANCELLATION_FLAG <> 'Y'";
+ dataset =
+ sqlContext.sql(query);
+ rows = dataset.collectAsList();
+ rs = new SparkResultSet(rows, dataset.columns());
+ assertTrue(rs.next());
+ assertEquals(4, rs.getLong(1));
+ assertFalse(rs.next());
+ }
+ }
+
+ @Test
public void testOrderByWithExpression() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
diff --git a/phoenix-spark-base/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala b/phoenix-spark-base/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
index a2ec2dc..111f021 100644
--- a/phoenix-spark-base/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
+++ b/phoenix-spark-base/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
@@ -66,7 +66,7 @@
val(whereLeftClause, leftUnsupportedFilters, _) = pushFilters(Array(leftFilter))
val(whereRightClause, rightUnsupportedFilters, _) = pushFilters(Array(rightFilter))
if (leftUnsupportedFilters.isEmpty && rightUnsupportedFilters.isEmpty) {
- filter.append(whereLeftClause + " OR " + whereRightClause)
+ filter.append("(" + whereLeftClause + " OR " + whereRightClause + ")")
}
else {
unsupportedFilters :+ f
diff --git a/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/OrderByIT.java b/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/OrderByIT.java
index 0b51436..e7aa424 100644
--- a/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/OrderByIT.java
+++ b/phoenix5-spark3-it/src/it/java/org/apache/phoenix/spark/OrderByIT.java
@@ -29,6 +29,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -284,6 +285,72 @@
}
@Test
+ public void testCombinationOfOrAndFilters() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(false);
+ String tableName1 = generateUniqueName();
+ String ddl = "CREATE TABLE " + tableName1 +
+ " (ENTITY_INSTANCE_ID BIGINT NOT NULL, MEMBER_ID VARCHAR(50) NOT NULL, CASE_ID VARCHAR(250) NOT NULL," +
+ " CANCELLATION_FLAG VARCHAR(1), CASE_MATCH_TYPE CHAR(1), CONSTRAINT PK1 PRIMARY KEY(ENTITY_INSTANCE_ID, " +
+ "MEMBER_ID, CASE_ID))\n";
+ createTestTable(getUrl(), ddl);
+ SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
+ Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+ .option("table", tableName1)
+ .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+ phoenixDataSet.createOrReplaceTempView(tableName1);
+ String dml = "UPSERT INTO " + tableName1 + " VALUES(?,?,?,?,?)";
+ PreparedStatement stmt = conn.prepareStatement(dml);
+ stmt.setInt(1, 40);
+ stmt.setString(2, "a");
+ stmt.setString(3, "b");
+ stmt.setString(4, "Y");
+ stmt.setString(5, "M");
+ stmt.execute();
+ stmt.setInt(1, 40);
+ stmt.setString(2, "c");
+ stmt.setString(3, "d");
+ stmt.setNull(4, Types.VARCHAR);
+ stmt.setString(5, "M");
+ stmt.execute();
+ stmt.setInt(1, 40);
+ stmt.setString(2, "e");
+ stmt.setString(3, "f");
+ stmt.setString(4, "N");
+ stmt.setString(5, "M");
+ stmt.execute();
+ stmt.setInt(1, 41);
+ stmt.setString(2, "f");
+ stmt.setString(3, "g");
+ stmt.setString(4, "N");
+ stmt.setString(5, "C");
+ stmt.execute();
+ conn.commit();
+ String query =
+ "select count(*) from " + tableName1 + " where ENTITY_INSTANCE_ID = 40 and " +
+ "(CANCELLATION_FLAG <> 'Y' OR CANCELLATION_FLAG IS NULL ) and CASE_MATCH_TYPE='M'";
+ Dataset<Row> dataset =
+ sqlContext.sql(query);
+ List<Row> rows = dataset.collectAsList();
+ ResultSet rs = new SparkResultSet(rows, dataset.columns());
+ assertTrue(rs.next());
+ assertEquals(2, rs.getLong(1));
+ assertFalse(rs.next());
+ query =
+ "select count(*) from " + tableName1 + " where ENTITY_INSTANCE_ID = 40 and CASE_MATCH_TYPE='M' " +
+ " OR CANCELLATION_FLAG <> 'Y'";
+ dataset =
+ sqlContext.sql(query);
+ rows = dataset.collectAsList();
+ rs = new SparkResultSet(rows, dataset.columns());
+ assertTrue(rs.next());
+ assertEquals(4, rs.getLong(1));
+ assertFalse(rs.next());
+ }
+ }
+
+ @Test
public void testOrderByWithExpression() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
diff --git a/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala b/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
index 8b9b481..77c172a 100644
--- a/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
+++ b/phoenix5-spark3/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
@@ -66,7 +66,7 @@
val(whereLeftClause, leftUnsupportedFilters, _) = pushFilters(Array(leftFilter))
val(whereRightClause, rightUnsupportedFilters, _) = pushFilters(Array(rightFilter))
if (leftUnsupportedFilters.isEmpty && rightUnsupportedFilters.isEmpty) {
- filter.append(whereLeftClause + " OR " + whereRightClause)
+ filter.append("(" + whereLeftClause + " OR " + whereRightClause + ")")
}
else {
unsupportedFilters :+ f