DRILL-8190: Fix mongo project pushdown for queries with joins (#2652)

diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java
index 8995aab..62250b4 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java
@@ -25,6 +25,7 @@
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.store.enumerable.plan.VertexDrel;
 
@@ -50,7 +51,7 @@
     VertexDrel in = call.rel(0);
     RelNode jdbcIntermediatePrel = new JdbcIntermediatePrel(
         in.getCluster(),
-        in.getTraitSet().replace(outTrait),
+        in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON),
         in.getInput(0), username);
     call.transformTo(jdbcIntermediatePrel);
   }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
index b55fa6b..774a232 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
@@ -55,7 +55,6 @@
 import org.apache.drill.exec.store.plan.rel.PluginSortRel;
 import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
 import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
-import org.apache.drill.exec.util.Utilities;
 import org.bson.BsonDocument;
 import org.bson.BsonElement;
 import org.bson.BsonInt32;
@@ -220,8 +219,8 @@
   }
 
   @Override
-  public void implement(StoragePluginTableScan scan) throws IOException {
-    groupScan = (MongoGroupScan) Utilities.getDrillTable(scan.getTable()).getGroupScan();
+  public void implement(StoragePluginTableScan scan) {
+    groupScan = (MongoGroupScan) scan.getGroupScan();
     operations = this.groupScan.getScanSpec().getOperations().stream()
       .map(BsonDocument::parse)
       .collect(Collectors.toList());
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java
index a691443..372ec6d 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java
@@ -101,4 +101,26 @@
         .go();
   }
 
+  @Test // DRILL-8190
+  public void testProjectWithJoin() throws Exception {
+    String query = "SELECT sum(s1.sales) s1_sales,\n" +
+      "sum(s2.sales) s2_sales\n" +
+      "FROM mongo.%s.`%s` s1\n" +
+      "JOIN mongo.%s.`%s` s2 ON s1._id = s2._id";
+
+    queryBuilder()
+      .sql(query, DONUTS_DB, DONUTS_COLLECTION, DONUTS_DB, DONUTS_COLLECTION)
+      .planMatcher()
+      .include("columns=\\[`_id`, `sales`]")
+      .exclude("columns=\\[`\\*\\*`")
+      .match();
+
+    testBuilder()
+      .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION, DONUTS_DB, DONUTS_COLLECTION)
+      .unOrdered()
+      .baselineColumns("s1_sales", "s2_sales")
+      .baselineValues(1194L, 1194L)
+      .go();
+  }
+
 }
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixIntermediatePrelConverterRule.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixIntermediatePrelConverterRule.java
index c5eaaf1..7d6a88d 100644
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixIntermediatePrelConverterRule.java
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixIntermediatePrelConverterRule.java
@@ -25,6 +25,7 @@
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.store.enumerable.plan.VertexDrel;
 
@@ -48,7 +49,7 @@
     VertexDrel in = call.rel(0);
     RelNode intermediatePrel = new PhoenixIntermediatePrel(
       in.getCluster(),
-      in.getTraitSet().replace(outTrait),
+      in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON),
       in.getInput(0));
     call.transformTo(intermediatePrel);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java
index 25c2fe2..cdfeb6c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java
@@ -30,6 +30,7 @@
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.metadata.BuiltInMetadata;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMdDistinctRowCount;
 import org.apache.calcite.rel.metadata.RelMdUtil;
@@ -42,7 +43,6 @@
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.util.BuiltInMethod;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.planner.common.DrillJoinRelBase;
@@ -61,12 +61,9 @@
 public class DrillRelMdDistinctRowCount extends RelMdDistinctRowCount{
   private static final Logger logger = LoggerFactory.getLogger(DrillRelMdDistinctRowCount.class);
 
-  private static final DrillRelMdDistinctRowCount INSTANCE =
-      new DrillRelMdDistinctRowCount();
-
   public static final RelMetadataProvider SOURCE =
-      ReflectiveRelMetadataProvider.reflectiveSource(
-          BuiltInMethod.DISTINCT_ROW_COUNT.method, INSTANCE);
+    ReflectiveRelMetadataProvider.reflectiveSource(
+      new DrillRelMdDistinctRowCount(), BuiltInMetadata.DistinctRowCount.Handler.class);
 
   /**
    * We need to override this method since Calcite and Drill calculate
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdMaxRowCount.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdMaxRowCount.java
index ed96025..09e555c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdMaxRowCount.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdMaxRowCount.java
@@ -18,18 +18,17 @@
 package org.apache.drill.exec.planner.cost;
 
 import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.metadata.BuiltInMetadata;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMdMaxRowCount;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.util.BuiltInMethod;
 
 public class DrillRelMdMaxRowCount extends RelMdMaxRowCount {
 
-  private static final DrillRelMdMaxRowCount INSTANCE = new DrillRelMdMaxRowCount();
-
   public static final RelMetadataProvider SOURCE =
-      ReflectiveRelMetadataProvider.reflectiveSource(BuiltInMethod.MAX_ROW_COUNT.method, INSTANCE);
+    ReflectiveRelMetadataProvider.reflectiveSource(
+      new DrillRelMdMaxRowCount(), BuiltInMetadata.MaxRowCount.Handler.class);
 
   // The method is overriden because of changes done in CALCITE-2991 and
   // TODO: should be discarded when CALCITE-1048 is fixed.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java
index eaaf7d1..2f59ab3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java
@@ -23,11 +23,12 @@
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.metadata.BuiltInMetadata;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMdRowCount;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.schema.Table;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.drill.exec.planner.common.DrillLimitRelBase;
 import org.apache.drill.exec.planner.common.DrillRelOptUtil;
@@ -40,10 +41,12 @@
 import org.apache.drill.metastore.statistics.TableStatisticsKind;
 
 
-public class DrillRelMdRowCount extends RelMdRowCount{
-  private static final DrillRelMdRowCount INSTANCE = new DrillRelMdRowCount();
+public class DrillRelMdRowCount extends RelMdRowCount {
 
-  public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(BuiltInMethod.ROW_COUNT.method, INSTANCE);
+  public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(
+    new DrillRelMdRowCount(), BuiltInMetadata.RowCount.Handler.class);
+
+  private static final Double DEFAULT_SCAN_ROW_COUNT = 1e9;
 
   @Override
   public Double getRowCount(Aggregate rel, RelMetadataQuery mq) {
@@ -96,7 +99,14 @@
     PlannerSettings settings = PrelUtil.getSettings(rel.getCluster());
     // If guessing, return selectivity from RelMDRowCount
     if (DrillRelOptUtil.guessRows(rel)) {
-      return super.getRowCount(rel, mq);
+      if (rel instanceof DrillScanRelBase
+        || rel.getTable().unwrap(Table.class).getStatistic().getRowCount() != null) {
+        return super.getRowCount(rel, mq);
+      } else {
+        // if table doesn't have row count statistics, return large row count
+        // to make sure that limit will be pushed down
+        return DEFAULT_SCAN_ROW_COUNT;
+      }
     }
     // Return rowcount from statistics, if available. Otherwise, delegate to parent.
     try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java
index 5732f91..bda0b16 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java
@@ -33,6 +33,7 @@
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.metadata.BuiltInMetadata;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMdSelectivity;
 import org.apache.calcite.rel.metadata.RelMdUtil;
@@ -46,7 +47,6 @@
 import org.apache.calcite.rex.RexVisitor;
 import org.apache.calcite.rex.RexVisitorImpl;
 import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.util.BuiltInMethod;
 import org.apache.calcite.util.Util;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.DbGroupScan;
@@ -71,8 +71,8 @@
 public class DrillRelMdSelectivity extends RelMdSelectivity {
   private static final Logger logger = LoggerFactory.getLogger(DrillRelMdSelectivity.class);
 
-  private static final DrillRelMdSelectivity INSTANCE = new DrillRelMdSelectivity();
-  public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(BuiltInMethod.SELECTIVITY.method, INSTANCE);
+  public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(
+    new DrillRelMdSelectivity(), BuiltInMetadata.Selectivity.Handler.class);
   /*
    * For now, we are treating all LIKE predicates to have the same selectivity irrespective of the number or position
    * of wildcard characters (%). This is no different than the present Drill/Calcite behaviour w.r.t to LIKE predicates.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/ColumnConverterFactoryProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/ColumnConverterFactoryProvider.java
index 28aec71..b91e4cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/ColumnConverterFactoryProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/ColumnConverterFactoryProvider.java
@@ -18,10 +18,12 @@
 package org.apache.drill.exec.store.enumerable;
 
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonTypeResolver;
 import org.apache.drill.exec.record.ColumnConverterFactory;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
+@JsonTypeResolver(DynamicTypeResolverBuilder.class)
 public interface ColumnConverterFactoryProvider {
 
   ColumnConverterFactory getFactory(TupleMetadata schema);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DynamicTypeResolverBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DynamicTypeResolverBuilder.java
new file mode 100644
index 0000000..64fa986
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DynamicTypeResolverBuilder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.enumerable;
+
+import com.fasterxml.jackson.databind.DeserializationConfig;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.jsontype.TypeDeserializer;
+import com.fasterxml.jackson.databind.jsontype.impl.StdTypeResolverBuilder;
+import org.reflections.Reflections;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class DynamicTypeResolverBuilder extends StdTypeResolverBuilder {
+
+  @Override
+  public TypeDeserializer buildTypeDeserializer(DeserializationConfig config,
+    JavaType baseType, Collection<NamedType> subtypes) {
+
+    Reflections reflections = new Reflections("org.apache.drill.exec.store");
+    @SuppressWarnings("unchecked")
+    Class<Object> rawClass = (Class<Object>) baseType.getRawClass();
+    List<NamedType> dynamicSubtypes = reflections.getSubTypesOf(rawClass).stream()
+      .map(NamedType::new)
+      .collect(Collectors.toList());
+    dynamicSubtypes.addAll(subtypes);
+
+    return super.buildTypeDeserializer(config, baseType, dynamicSubtypes);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java
index 2dec45a..9930484 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java
@@ -61,7 +61,7 @@
     builder.providedSchema(subScan.getSchema());
 
     ManagedReader<SchemaNegotiator> reader = new EnumerableRecordReader(subScan.getColumns(),
-        subScan.getFieldsMap(), subScan.getCode(), subScan.getSchemaPath(), subScan.factoryProvider());
+        subScan.getFieldsMap(), subScan.getCode(), subScan.getSchemaPath(), subScan.getConverterFactoryProvider());
     ManagedScanFramework.ReaderFactory readerFactory = new BasicScanFactory(Collections.singletonList(reader).iterator());
     builder.setReaderFactory(readerFactory);
     builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java
index 4476be8..0c7245b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java
@@ -79,7 +79,7 @@
     return schemaPath;
   }
 
-  public ColumnConverterFactoryProvider factoryProvider() {
+  public ColumnConverterFactoryProvider getConverterFactoryProvider() {
     return converterFactoryProvider;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerableIntermediatePrelConverterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerableIntermediatePrelConverterRule.java
index 7272a36..c5ede3a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerableIntermediatePrelConverterRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerableIntermediatePrelConverterRule.java
@@ -25,6 +25,7 @@
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
 import org.apache.drill.exec.planner.physical.Prel;
 
 public class EnumerableIntermediatePrelConverterRule extends RelOptRule {
@@ -48,7 +49,7 @@
     VertexDrel in = call.rel(0);
     RelNode intermediatePrel = new EnumerableIntermediatePrel(
         in.getCluster(),
-        in.getTraitSet().replace(outTrait),
+        in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON),
         in.getInput(0),
         context);
     call.transformTo(intermediatePrel);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/VertexDrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/VertexDrel.java
index edbc591..d202ebd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/VertexDrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/VertexDrel.java
@@ -18,15 +18,22 @@
 package org.apache.drill.exec.store.enumerable.plan;
 
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.exec.planner.logical.DrillImplementor;
 import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.util.Utilities;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 import java.util.List;
 
+import static org.apache.drill.exec.planner.logical.DrillScanRel.STAR_COLUMN_COST;
+
 /**
  * The vertex simply holds the child nodes but contains its own traits.
  * Used for completing Drill logical planning when child nodes have some specific traits.
@@ -51,4 +58,15 @@
   public LogicalOperator implement(DrillImplementor implementor) {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    double rowCount = estimateRowCount(mq);
+    double columnCount = Utilities.isStarQuery(getRowType()) ? STAR_COLUMN_COST : getRowType().getFieldCount();
+    double valueCount = rowCount * columnCount;
+    // columns count is considered during cost calculation to make preferable plans
+    // with pushed plugin project operators since in the opposite case planner wouldn't consider
+    // a plan with additional plugin projection that reduces columns as better than a plan without it
+    return planner.getCostFactory().makeCost(rowCount, valueCount, 0).multiplyBy(0.1);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java
index 02885e9..3c7f115 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java
@@ -51,7 +51,7 @@
 
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-    return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+    return super.computeLogicalAggCost(planner, mq).multiplyBy(0.1);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java
index 2be69ef..38525e2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.store.plan.rel;
 
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelWriter;
@@ -27,11 +29,14 @@
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.common.DrillScanRelBase;
 import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.util.Utilities;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.drill.exec.planner.logical.DrillScanRel.STAR_COLUMN_COST;
+
 /**
  * Storage plugin table scan rel implementation.
  */
@@ -75,6 +80,21 @@
     return implementor.canImplement(this);
   }
 
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    List<SchemaPath> columns = groupScan.getColumns();
+    // column count should be adjusted to consider the case of projecting nested columns,
+    // such a scan should be preferable compared to the scan where root columns are projected only
+    double columnCount = Utilities.isStarQuery(columns)
+      ? STAR_COLUMN_COST
+      : Math.pow(getRowType().getFieldCount(), 2) / Math.max(columns.size(), 1);
+
+    double rowCount = estimateRowCount(mq);
+    double valueCount = rowCount * columnCount;
+
+    return planner.getCostFactory().makeCost(rowCount, valueCount, 0).multiplyBy(0.1);
+  }
+
   private static List<SchemaPath> getColumns(RelDataType rowType) {
     return rowType.getFieldList().stream()
       .map(filed -> filed.isDynamicStar()
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java
index 279241e..a13dc25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java
@@ -25,6 +25,7 @@
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.store.enumerable.plan.VertexDrel;
 import org.apache.drill.exec.store.plan.PluginImplementor;
@@ -53,7 +54,7 @@
     VertexDrel in = call.rel(0);
     RelNode intermediatePrel = new PluginIntermediatePrel(
         in.getCluster(),
-        in.getTraitSet().replace(outTrait),
+        in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON),
         in.getInput(0),
         implementorFactory);
     call.transformTo(intermediatePrel);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
index 87f2201..1d160d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
@@ -21,6 +21,7 @@
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
@@ -33,8 +34,6 @@
 import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.base.Predicate;
-import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
 
 public class Utilities {
 
@@ -52,17 +51,13 @@
     int majorFragmentId = handle.getMajorFragmentId();
     int minorFragmentId = handle.getMinorFragmentId();
 
-    String fileName = String.format("%s//%s_%s_%s_%s", location, qid, majorFragmentId, minorFragmentId, tag);
-
-    return fileName;
+    return String.format("%s//%s_%s_%s_%s", location, qid, majorFragmentId, minorFragmentId, tag);
   }
 
   /**
    * Create {@link org.apache.drill.exec.proto.BitControl.QueryContextInformation} with given <i>defaultSchemaName</i>. Rest of the members of the
    * QueryContextInformation is derived from the current state of the process.
    *
-   * @param defaultSchemaName
-   * @param sessionId
    * @return A {@link org.apache.drill.exec.proto.BitControl.QueryContextInformation} with given <i>defaultSchemaName</i>.
    */
   public static QueryContextInformation createQueryContextInfo(final String defaultSchemaName,
@@ -82,22 +77,25 @@
    * @return The Drill version.
    */
   public static String getDrillVersion() {
-      String v = Utilities.class.getPackage().getImplementationVersion();
-      return v;
+    return Utilities.class.getPackage().getImplementationVersion();
   }
 
   /**
    * Return true if list of schema path has star column.
-   * @param projected
+   *
    * @return True if the list of {@link org.apache.drill.common.expression.SchemaPath}s has star column.
    */
   public static boolean isStarQuery(Collection<SchemaPath> projected) {
-    return Iterables.tryFind(Preconditions.checkNotNull(projected, COL_NULL_ERROR), new Predicate<SchemaPath>() {
-      @Override
-      public boolean apply(SchemaPath path) {
-        return Preconditions.checkNotNull(path).equals(SchemaPath.STAR_COLUMN);
-      }
-    }).isPresent();
+    return Preconditions.checkNotNull(projected, COL_NULL_ERROR).stream()
+      .anyMatch(SchemaPath::isDynamicStar);
+  }
+
+  /**
+   * Return true if the row type has star column.
+   */
+  public static boolean isStarQuery(RelDataType projected) {
+    return projected.getFieldNames().stream()
+      .anyMatch(SchemaPath.DYNAMIC_STAR::equals);
   }
 
   /**