DRILL-8209: Introduce rule for converting join with distinct input to semi-join (#2533)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index aa61c34..fe99686 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.planner;
 
 import org.apache.drill.exec.planner.logical.ConvertMetadataAggregateToDirectScanRule;
+import org.apache.drill.exec.planner.logical.DrillDistinctJoinToSemiJoinRule;
 import org.apache.drill.exec.planner.physical.MetadataAggPrule;
 import org.apache.drill.exec.planner.physical.MetadataControllerPrule;
 import org.apache.drill.exec.planner.physical.MetadataHandlerPrule;
@@ -403,6 +404,8 @@
     if (optimizerRulesContext.getPlannerSettings().isHashJoinEnabled() &&
         optimizerRulesContext.getPlannerSettings().isSemiJoinEnabled()) {
       basicRules.add(RuleInstance.SEMI_JOIN_PROJECT_RULE);
+      basicRules.add(DrillDistinctJoinToSemiJoinRule.INSTANCE);
+      basicRules.add(RuleInstance.JOIN_TO_SEMI_JOIN_RULE);
     }
 
     return RuleSets.ofList(basicRules.build());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
index 5867766..bbcd075 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
@@ -72,6 +72,14 @@
     }
   };
 
+  SemiJoinRule JOIN_TO_SEMI_JOIN_RULE = new SemiJoinRule.JoinToSemiJoinRule(Join.class, Aggregate.class,
+    DrillRelFactories.LOGICAL_BUILDER, "DrillJoinToSemiJoinRule") {
+    public boolean matches(RelOptRuleCall call) {
+      Join join = call.rel(0);
+      return !(join.getCondition().isAlwaysTrue() || join.getCondition().isAlwaysFalse());
+    }
+  };
+
   JoinPushExpressionsRule JOIN_PUSH_EXPRESSIONS_RULE =
       new JoinPushExpressionsRule(Join.class,
           DrillRelFactories.LOGICAL_BUILDER);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillDistinctJoinToSemiJoinRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillDistinctJoinToSemiJoinRule.java
new file mode 100644
index 0000000..9b63ae4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillDistinctJoinToSemiJoinRule.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.drill.exec.physical.impl.join.JoinUtils;
+
+/**
+ * Converts join with distinct right input to semi-join.
+ */
+public class DrillDistinctJoinToSemiJoinRule extends RelOptRule {
+  public static final RelOptRule INSTANCE = new DrillDistinctJoinToSemiJoinRule();
+
+  public DrillDistinctJoinToSemiJoinRule() {
+    super(RelOptHelper.any(Project.class, Join.class),
+      DrillRelFactories.LOGICAL_BUILDER, "DrillDistinctJoinToSemiJoinRule");
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    RelMetadataQuery mq = call.getMetadataQuery();
+    Project project = call.rel(0);
+    Join join = call.rel(1);
+    ImmutableBitSet bits = RelOptUtil.InputFinder.bits(project.getProjects(), null);
+    ImmutableBitSet rightBits = ImmutableBitSet.range(
+      join.getLeft().getRowType().getFieldCount(),
+      join.getRowType().getFieldCount());
+    JoinInfo joinInfo = join.analyzeCondition();
+    // can convert to semi-join if all of these are true
+    // - non-cartesian join
+    // - projecting only columns from left input
+    // - join has only equality conditions
+    // - all columns in condition from the right input are unique
+    return !JoinUtils.checkCartesianJoin(join)
+      && !bits.intersects(rightBits)
+      && joinInfo.isEqui()
+      && SqlFunctions.isTrue(mq.areColumnsUnique(join.getRight(), joinInfo.rightSet()));
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    Project project = call.rel(0);
+    Join join = call.rel(1);
+    RelBuilder relBuilder = call.builder();
+    RelNode relNode = relBuilder.push(join.getLeft())
+      .push(join.getRight())
+      .semiJoin(join.getCondition())
+      .project(project.getProjects())
+      .build();
+    call.transformTo(relNode);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java
index 2ad751c..8861f4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java
@@ -24,6 +24,7 @@
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMdUtil;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.Pair;
@@ -102,9 +103,16 @@
     return new LogicalSemiJoin(leftOp, rightOp, conditions, joinType);
   }
 
-  // This method is the same as in Calcite and is here to ensure SemiJoin's behavior
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-    return planner.getCostFactory().makeTinyCost();
+    return computeLogicalJoinCost(planner, mq);
+  }
+
+  @Override
+  public double estimateRowCount(RelMetadataQuery mq) {
+    RexNode semiJoinSelectivity =
+      RelMdUtil.makeSemiJoinSelectivityRexNode(mq, this);
+    Double selectivity = mq.getSelectivity(getLeft(), semiJoinSelectivity);
+    return selectivity * mq.getRowCount(getLeft());
   }
 }