[CALCITE-3935] Enhance Join-Materialization, support to pull-up filters under join of left or right (xurenhe)
diff --git a/core/src/main/java/org/apache/calcite/plan/SubstitutionVisitor.java b/core/src/main/java/org/apache/calcite/plan/SubstitutionVisitor.java
index e0f78b0..60a9713 100644
--- a/core/src/main/java/org/apache/calcite/plan/SubstitutionVisitor.java
+++ b/core/src/main/java/org/apache/calcite/plan/SubstitutionVisitor.java
@@ -1212,15 +1212,13 @@
 
       final RexBuilder rexBuilder = call.getCluster().getRexBuilder();
 
-      // Try pulling up MutableCalc only when:
-      // 1. it's inner join.
-      // 2. it's outer join but no filtering condition from MutableCalc.
+      // Check whether is same join type.
       final JoinRelType joinRelType = sameJoinType(query.joinType, target.joinType);
       if (joinRelType == null) {
         return null;
       }
-      if (joinRelType != JoinRelType.INNER
-          && !(joinRelType.isOuterJoin() && qInput0Cond.isAlwaysTrue())) {
+      // Check if filter under join can be pulled up.
+      if (!canPullUpFilterUnderJoin(joinRelType, qInput0Cond, null)) {
         return null;
       }
       // Try pulling up MutableCalc only when Join condition references mapping.
@@ -1298,15 +1296,13 @@
 
       final RexBuilder rexBuilder = call.getCluster().getRexBuilder();
 
-      // Try pulling up MutableCalc only when:
-      // 1. it's inner join.
-      // 2. it's outer join but no filtering condition from MutableCalc.
+      // Check whether is same join type.
       final JoinRelType joinRelType = sameJoinType(query.joinType, target.joinType);
       if (joinRelType == null) {
         return null;
       }
-      if (joinRelType != JoinRelType.INNER
-          && !(joinRelType.isOuterJoin() && qInput1Cond.isAlwaysTrue())) {
+      // Check if filter under join can be pulled up.
+      if (!canPullUpFilterUnderJoin(joinRelType, null, qInput1Cond)) {
         return null;
       }
       // Try pulling up MutableCalc only when Join condition references mapping.
@@ -1389,17 +1385,13 @@
 
       final RexBuilder rexBuilder = call.getCluster().getRexBuilder();
 
-      // Try pulling up MutableCalc only when:
-      // 1. it's inner join.
-      // 2. it's outer join but no filtering condition from MutableCalc.
+      // Check whether is same join type.
       final JoinRelType joinRelType = sameJoinType(query.joinType, target.joinType);
       if (joinRelType == null) {
         return null;
       }
-      if (joinRelType != JoinRelType.INNER
-          && !(joinRelType.isOuterJoin()
-              && qInput0Cond.isAlwaysTrue()
-              && qInput1Cond.isAlwaysTrue())) {
+      // Check if filter under join can be pulled up.
+      if (!canPullUpFilterUnderJoin(joinRelType, qInput0Cond, qInput1Cond)) {
         return null;
       }
       if (!referenceByMapping(query.condition, qInput0Projs, qInput1Projs)) {
@@ -2085,6 +2077,34 @@
     return RelOptUtil.equal(desc0, rel0.rowType, desc1, rel1.rowType, litmus);
   }
 
+  /**
+   * Check if filter under join can be pulled up,
+   * when meeting JoinOnCalc of query unify to Join of target.
+   * Working in rules: {@link JoinOnLeftCalcToJoinUnifyRule} <br/>
+   * {@link JoinOnRightCalcToJoinUnifyRule} <br/>
+   * {@link JoinOnCalcsToJoinUnifyRule} <br/>
+   */
+  private static boolean canPullUpFilterUnderJoin(JoinRelType joinType,
+      @Nullable RexNode leftFilterRexNode, @Nullable RexNode rightFilterRexNode) {
+    if (joinType == JoinRelType.INNER) {
+      return true;
+    }
+    if (joinType == JoinRelType.LEFT
+        && (rightFilterRexNode == null || rightFilterRexNode.isAlwaysTrue())) {
+      return true;
+    }
+    if (joinType == JoinRelType.RIGHT
+        && (leftFilterRexNode == null || leftFilterRexNode.isAlwaysTrue())) {
+      return true;
+    }
+    if (joinType == JoinRelType.FULL
+        && ((rightFilterRexNode == null || rightFilterRexNode.isAlwaysTrue())
+        && (leftFilterRexNode == null || leftFilterRexNode.isAlwaysTrue()))) {
+      return true;
+    }
+    return false;
+  }
+
   /** Operand to a {@link UnifyRule}. */
   protected abstract static class Operand {
     protected final Class<? extends MutableRel> clazz;
diff --git a/core/src/test/java/org/apache/calcite/test/MaterializedViewSubstitutionVisitorTest.java b/core/src/test/java/org/apache/calcite/test/MaterializedViewSubstitutionVisitorTest.java
index bed699f..e284755 100644
--- a/core/src/test/java/org/apache/calcite/test/MaterializedViewSubstitutionVisitorTest.java
+++ b/core/src/test/java/org/apache/calcite/test/MaterializedViewSubstitutionVisitorTest.java
@@ -1041,6 +1041,90 @@
         + "where \"name\" = 'hello'";
     sql(mv, query).ok();
   }
+  /** Unit test for FilterBottomJoin can be pulled up. */
+  @Test void testLeftFilterOnLeftJoinToJoinOk1() {
+    String mv = "select * from \n"
+        + "(select \"empid\", \"deptno\", \"name\" from \"emps\") \"t1\"\n"
+        + "left join (select \"deptno\", \"name\" from \"depts\") \"t2\"\n"
+        + "on \"t1\".\"deptno\" = \"t2\".\"deptno\"";
+    String query = "select * from \n"
+        + "(select \"empid\", \"deptno\", \"name\" from \"emps\" where \"empid\" > 10) \"t1\"\n"
+        + "left join (select \"deptno\", \"name\" from \"depts\") \"t2\"\n"
+        + "on \"t1\".\"deptno\" = \"t2\".\"deptno\"";
+    sql(mv, query).ok();
+  }
+
+  @Test void testLeftFilterOnLeftJoinToJoinOk2() {
+    String mv = "select * from \n"
+        + "(select \"empid\", \"deptno\", \"name\" from \"emps\" where \"empid\" > 10) \"t1\"\n"
+        + "left join (select \"deptno\", \"name\" from \"depts\") \"t2\"\n"
+        + "on \"t1\".\"deptno\" = \"t2\".\"deptno\"";
+    String query = "select * from \n"
+        + "(select \"empid\", \"deptno\", \"name\" from \"emps\" where \"empid\" > 30) \"t1\"\n"
+        + "left join (select \"deptno\", \"name\" from \"depts\") \"t2\"\n"
+        + "on \"t1\".\"deptno\" = \"t2\".\"deptno\"";
+    sql(mv, query).ok();
+  }
+
+  @Test void testRightFilterOnLeftJoinToJoinFail() {
+    String mv = "select * from \n"
+        + "(select \"empid\", \"deptno\", \"name\" from \"emps\") \"t1\"\n"
+        + "left join (select \"deptno\", \"name\" from \"depts\") \"t2\"\n"
+        + "on \"t1\".\"deptno\" = \"t2\".\"deptno\"";
+    String query = "select * from \n"
+        + "(select \"empid\", \"deptno\", \"name\" from \"emps\") \"t1\"\n"
+        + "left join (select \"deptno\", \"name\" from \"depts\" where \"name\" is not null) \"t2\"\n"
+        + "on \"t1\".\"deptno\" = \"t2\".\"deptno\"";
+    sql(mv, query).noMat();
+  }
+
+  @Test void testRightFilterOnRightJoinToJoinOk() {
+    String mv = "select * from \n"
+        + "(select \"empid\", \"deptno\", \"name\" from \"emps\") \"t1\"\n"
+        + "right join (select \"deptno\", \"name\" from \"depts\") \"t2\"\n"
+        + "on \"t1\".\"deptno\" = \"t2\".\"deptno\"";
+    String query = "select * from \n"
+        + "(select \"empid\", \"deptno\", \"name\" from \"emps\") \"t1\"\n"
+        + "right join (select \"deptno\", \"name\" from \"depts\" where \"name\" is not null) \"t2\"\n"
+        + "on \"t1\".\"deptno\" = \"t2\".\"deptno\"";
+    sql(mv, query).ok();
+  }
+
+  @Test void testLeftFilterOnRightJoinToJoinFail() {
+    String mv = "select * from \n"
+        + "(select \"empid\", \"deptno\", \"name\" from \"emps\") \"t1\"\n"
+        + "right join (select \"deptno\", \"name\" from \"depts\") \"t2\"\n"
+        + "on \"t1\".\"deptno\" = \"t2\".\"deptno\"";
+    String query = "select * from \n"
+        + "(select \"empid\", \"deptno\", \"name\" from \"emps\" where \"empid\" > 30) \"t1\"\n"
+        + "right join (select \"deptno\", \"name\" from \"depts\") \"t2\"\n"
+        + "on \"t1\".\"deptno\" = \"t2\".\"deptno\"";
+    sql(mv, query).noMat();
+  }
+
+  @Test void testLeftFilterOnFullJoinToJoinFail() {
+    String mv = "select * from \n"
+        + "(select \"empid\", \"deptno\", \"name\" from \"emps\") \"t1\"\n"
+        + "full join (select \"deptno\", \"name\" from \"depts\") \"t2\"\n"
+        + "on \"t1\".\"deptno\" = \"t2\".\"deptno\"";
+    String query = "select * from \n"
+        + "(select \"empid\", \"deptno\", \"name\" from \"emps\" where \"empid\" > 30) \"t1\"\n"
+        + "full join (select \"deptno\", \"name\" from \"depts\") \"t2\"\n"
+        + "on \"t1\".\"deptno\" = \"t2\".\"deptno\"";
+    sql(mv, query).noMat();
+  }
+
+  @Test void testRightFilterOnFullJoinToJoinFail() {
+    String mv = "select * from \n"
+        + "(select \"empid\", \"deptno\", \"name\" from \"emps\") \"t1\"\n"
+        + "full join (select \"deptno\", \"name\" from \"depts\") \"t2\"\n"
+        + "on \"t1\".\"deptno\" = \"t2\".\"deptno\"";
+    String query = "select * from \n"
+        + "(select \"empid\", \"deptno\", \"name\" from \"emps\") \"t1\"\n"
+        + "full join (select \"deptno\", \"name\" from \"depts\" where \"name\" is not null) \"t2\"\n"
+        + "on \"t1\".\"deptno\" = \"t2\".\"deptno\"";
+    sql(mv, query).noMat();
+  }
 
   @Test void testMoreSameExprInMv() {
     final String mv = ""