DRILL-8245: Project pushdown depends on rules order and might not happen
diff --git a/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdParser.java b/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdParser.java
index 4aec836..19a0b35 100644
--- a/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdParser.java
+++ b/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdParser.java
@@ -33,10 +33,10 @@
 import nl.basjes.parse.core.exceptions.InvalidDissectorException;
 import nl.basjes.parse.core.exceptions.MissingDissectorsException;
 import nl.basjes.parse.httpdlog.HttpdLoglineParser;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
@@ -190,13 +190,7 @@
     if (!isStarQuery() &&
         !isMetadataQuery() &&
         !isOnlyImplicitColumns()) {
-      List<String> keysToRemove = new ArrayList<>();
-      for (final String key : requestedPaths.keySet()) {
-        if (!isRequested(key)) {
-          keysToRemove.add(key);
-        }
-      }
-      keysToRemove.forEach( key -> requestedPaths.remove(key));
+      requestedPaths = getRequestedColumnPaths();
     }
 
     EnumSet<Casts> allCasts;
@@ -254,6 +248,25 @@
     return builder.build();
   }
 
+  private Map<String, String> getRequestedColumnPaths() {
+    Map<String, String> requestedColumnPaths = new TreeMap<>();
+    for (SchemaPath requestedColumn : requestedColumns) {
+      String columnName = requestedColumn.getRootSegmentPath();
+      String parserPath = requestedPaths.get(columnName);
+      if (parserPath != null) {
+        requestedColumnPaths.put(columnName, parserPath);
+      } else {
+        requestedPaths.keySet()
+          .stream()
+          .filter(colName -> colName.endsWith(HttpdUtils.SAFE_WILDCARD)
+            && requestedColumn.rootName().startsWith(colName.substring(0, colName.length() - HttpdUtils.SAFE_WILDCARD.length())))
+          .findAny()
+          .ifPresent(colName -> requestedColumnPaths.put(colName, requestedPaths.get(colName)));
+      }
+    }
+    return requestedColumnPaths;
+  }
+
   public void addFieldsToParser(RowSetLoader rowWriter) {
     for (final Map.Entry<String, String> entry : requestedPaths.entrySet()) {
       try {
@@ -266,24 +279,14 @@
   }
 
   public boolean isStarQuery() {
-    return requestedColumns.size() == 1 && requestedColumns.get(0).isDynamicStar();
+    return requestedColumns.stream()
+      .anyMatch(SchemaPath::isDynamicStar);
   }
 
   public boolean isMetadataQuery() {
     return requestedColumns.size() == 0;
   }
 
-  public boolean isRequested(String colName) {
-    for (SchemaPath path : requestedColumns) {
-      if (path.isDynamicStar()) {
-        return true;
-      } else if (path.nameEquals(colName)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   /*
   This is for the edge case where a query only contains the implicit fields.
    */
diff --git a/contrib/format-httpd/src/test/java/org/apache/drill/exec/store/httpd/TestHTTPDLogReader.java b/contrib/format-httpd/src/test/java/org/apache/drill/exec/store/httpd/TestHTTPDLogReader.java
index 877cff9..ee6be4b 100644
--- a/contrib/format-httpd/src/test/java/org/apache/drill/exec/store/httpd/TestHTTPDLogReader.java
+++ b/contrib/format-httpd/src/test/java/org/apache/drill/exec/store/httpd/TestHTTPDLogReader.java
@@ -118,6 +118,12 @@
       "\\\"%{User-agent}i\\\"', " +
       "flattenWildcards => true)) WHERE `request_firstline_original_uri_query_came__from` IS NOT NULL";
 
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("columns=\\[`request_firstline_original_uri_query_came__from`\\]")
+      .match();
+
     RowSet results = client.queryBuilder().sql(sql).rowSet();
 
     TupleMetadata expectedSchema = new SchemaBuilder()
diff --git a/contrib/format-httpd/src/test/java/org/apache/drill/exec/store/httpd/TestHTTPDLogReaderUserAgent.java b/contrib/format-httpd/src/test/java/org/apache/drill/exec/store/httpd/TestHTTPDLogReaderUserAgent.java
index 5edcc2b..131ea48 100644
--- a/contrib/format-httpd/src/test/java/org/apache/drill/exec/store/httpd/TestHTTPDLogReaderUserAgent.java
+++ b/contrib/format-httpd/src/test/java/org/apache/drill/exec/store/httpd/TestHTTPDLogReaderUserAgent.java
@@ -199,8 +199,6 @@
 
     RowSet results = client.queryBuilder().sql(sql).rowSet();
 
-    results.print();
-
     TupleMetadata expectedSchema = new SchemaBuilder()
             .addNullable("request_receive_time_epoch",                          MinorType.TIMESTAMP)
             .addNullable("request_user-agent",                                  MinorType.VARCHAR)
@@ -255,8 +253,4 @@
     RowSetUtilities.verify(expected, results);
   }
 
-
-
 }
-
-
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index fe99686..01afb1a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -323,7 +323,8 @@
       /*
        Filter push-down related rules
        */
-      DrillPushFilterPastProjectRule.INSTANCE,
+      DrillPushFilterPastProjectRule.LOGICAL,
+      DrillPushFilterPastProjectRule.DRILL_INSTANCE,
       // Due to infinite loop in planning (DRILL-3257/CALCITE-1271), temporarily use this rule in Hep planner
       // RuleInstance.FILTER_SET_OP_TRANSPOSE_RULE,
       DrillFilterAggregateTransposeRule.INSTANCE,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java
index 5ea1d58..146d0f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.planner.logical;
 
+import org.apache.drill.exec.planner.common.DrillProjectRelBase;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
@@ -39,7 +40,10 @@
 
 public class DrillPushFilterPastProjectRule extends RelOptRule {
 
-  public final static RelOptRule INSTANCE = new DrillPushFilterPastProjectRule(DrillRelFactories.LOGICAL_BUILDER);
+  public final static RelOptRule LOGICAL = new DrillPushFilterPastProjectRule(
+    LogicalFilter.class, LogicalProject.class, DrillRelFactories.LOGICAL_BUILDER, "DrillPushFilterPastProjectRule:logical");
+  public final static RelOptRule DRILL_INSTANCE = new DrillPushFilterPastProjectRule(
+    DrillFilterRel.class, DrillProjectRelBase.class, DrillRelFactories.LOGICAL_BUILDER, "DrillPushFilterPastProjectRule:drill_logical");
 
   private static final Collection<String> BANNED_OPERATORS;
 
@@ -49,8 +53,9 @@
     BANNED_OPERATORS.add("item");
   }
 
-  private DrillPushFilterPastProjectRule(RelBuilderFactory relBuilderFactory) {
-    super(operand(LogicalFilter.class, operand(LogicalProject.class, any())), relBuilderFactory,null);
+  private DrillPushFilterPastProjectRule(Class<? extends Filter> filter,
+    Class<? extends Project> project, RelBuilderFactory relBuilderFactory, String description) {
+    super(operand(filter, operand(project, any())), relBuilderFactory,description);
   }
 
   //~ Methods ----------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
index bcd9792..61f06e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
@@ -60,9 +60,7 @@
 
   public DrillScanRel(final RelOptCluster cluster, final RelTraitSet traits,
                       final RelOptTable table, boolean partitionFilterPushdown) {
-    // By default, scan does not support project pushdown.
-    // Decision whether push projects into scan will be made solely in DrillPushProjIntoScanRule.
-    this(cluster, traits, table, table.getRowType(), getProjectedColumns(table, true), partitionFilterPushdown);
+    this(cluster, traits, table, table.getRowType(), getProjectedColumns(table, false), partitionFilterPushdown);
     this.settings = PrelUtil.getPlannerSettings(cluster.getPlanner());
   }
 
@@ -139,7 +137,9 @@
   @Override
   public RelOptCost computeSelfCost(final RelOptPlanner planner, RelMetadataQuery mq) {
     final ScanStats stats = getGroupScan().getScanStats(settings);
-    int columnCount = Utilities.isStarQuery(columns) ? STAR_COLUMN_COST : getRowType().getFieldCount();
+    double columnCount = Utilities.isStarQuery(columns)
+      ? STAR_COLUMN_COST
+      : Math.pow(getRowType().getFieldCount(), 2) / Math.max(columns.size(), 1);
 
     // double rowCount = RelMetadataQuery.getRowCount(this);
     double rowCount = Math.max(1, stats.getRecordCount());