Refactoring
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOPushDownRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOPushDownRule.java
index e32397a..dbe12a1 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOPushDownRule.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOPushDownRule.java
@@ -32,6 +32,7 @@
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
 import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
 import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
 import org.apache.beam.sdk.schemas.FieldAccessDescriptor.FieldDescriptor;
 import org.apache.beam.sdk.schemas.Schema;
@@ -89,13 +90,13 @@
 
     // When predicate push-down is not supported - all filters are unsupported.
     final BeamSqlTableFilter tableFilter = beamSqlTable.constructFilter(projectFilter.right);
-    if (!beamSqlTable.supportsProjects().isProjectSupported() && tableFilter instanceof DefaultTableFilter) {
+    if (!beamSqlTable.supportsProjects().isSupported() && tableFilter instanceof DefaultTableFilter) {
       // Either project or filter push-down must be supported by the IO.
       return;
     }
 
     Set<String> usedFields = new LinkedHashSet<>();
-    if (!(tableFilter instanceof DefaultTableFilter) && !beamSqlTable.supportsProjects().isProjectSupported()) {
+    if (!(tableFilter instanceof DefaultTableFilter) && !beamSqlTable.supportsProjects().isSupported()) {
       // When applying standalone filter push-down all fields must be project by an IO.
       // With a single exception: Calc projects all fields (in the same order) and does nothing
       // else.
@@ -117,36 +118,17 @@
       return;
     }
 
-    boolean fieldReorderingSupported = beamSqlTable.supportsProjects().isFieldReorderingSupported();
-    //boolean fieldsProjectedInOrder = fieldsProjectedInSchemaOrder(beamSqlTable, program, calcInputRowType);
-    boolean isProjectSupported = beamSqlTable.supportsProjects().isProjectSupported();
-
     FieldAccessDescriptor resolved = FieldAccessDescriptor.withFieldNames(usedFields);
-    if (fieldReorderingSupported) {
-      // Only needs to be done when field reordering is supported.
+    if (beamSqlTable.supportsProjects().withFieldReordering()) {
+      // Only needs to be done when field reordering is supported, otherwise IO should project fields in the same order they are defined in the schema and let Calc do the reordering.
       resolved = resolved.withOrderByFieldInsertionOrder();
     }
     resolved = resolved.resolve(beamSqlTable.getSchema());
-    Schema newSchema =
-        SelectHelpers.getOutputSchema(ioSourceRel.getBeamSqlTable().getSchema(), resolved);
-    RelDataType calcInputType =
-        CalciteUtils.toCalciteRowType(newSchema, ioSourceRel.getCluster().getTypeFactory());
 
-    // Check if the calc can be dropped:
-    // 1. Calc only does projects and renames of fields in the same order when field reordering is not supported.
-    //    And
-    // 2. Predicate can be completely pushed-down to IO level.
-    //    And
-    // 3. Project push-down supported // TODO: Or all fields are projected by a Calc in the same order?
-    if (isProjectRenameOnlyProgram(program, fieldReorderingSupported)
-        && tableFilter.getNotSupported().isEmpty()
-        && (isProjectSupported || program.getProjectList().stream()
-        .map(ref -> program.getInputRowType().getFieldList().get(ref.getIndex()).getName())
-        .collect(Collectors.toList())
-        .equals(calcInputRowType.getFieldNames()))) {
+    if (canDropCalc(program, beamSqlTable.supportsProjects(), tableFilter)) {
       // Tell the optimizer to not use old IO, since the new one is better.
       call.getPlanner().setImportance(ioSourceRel, 0.0);
-      call.transformTo(ioSourceRel.copy(calc.getRowType(), newSchema.getFieldNames(), tableFilter));
+      call.transformTo(ioSourceRel.copy(calc.getRowType(), resolved.getFieldsAccessed().stream().map(FieldDescriptor::getFieldName).collect(Collectors.toList()), tableFilter));
       return;
     }
 
@@ -154,53 +136,18 @@
     // Calc contains all unsupported filters.
     // IO only projects fields utilised by a calc.
     if (tableFilter.getNotSupported().equals(projectFilter.right)
-        && usedFields.size() == ioSourceRel.getRowType().getFieldCount()) {
+        && usedFields.containsAll(ioSourceRel.getRowType().getFieldNames())) {
       return;
     }
 
-    BeamIOSourceRel newIoSourceRel =
-        ioSourceRel.copy(calcInputType, newSchema.getFieldNames(), tableFilter);
-    RelBuilder relBuilder = call.builder();
-    relBuilder.push(newIoSourceRel);
+    RelNode result = constructNodesWithPushDown(resolved, call.builder(), ioSourceRel, tableFilter, calc.getRowType(), projectFilter.left);
 
-    List<RexNode> newProjects = new ArrayList<>();
-    List<RexNode> newFilter = new ArrayList<>();
-    // Ex: let's say the original fields are (number before each element is the index):
-    // {0:unused1, 1:id, 2:name, 3:unused2},
-    // where only 'id' and 'name' are being used. Then the new calcInputType should be as follows:
-    // {0:id, 1:name}.
-    // A mapping list will contain 2 entries: {0:1, 1:2},
-    // showing how used field names map to the original fields.
-    List<Integer> mapping =
-        resolved.getFieldsAccessed().stream()
-            .map(FieldDescriptor::getFieldId)
-            .collect(Collectors.toList());
-
-    // Map filters to new RexInputRef.
-    for (RexNode filter : tableFilter.getNotSupported()) {
-      newFilter.add(reMapRexNodeToNewInputs(filter, mapping));
-    }
-    // Map projects to new RexInputRef.
-    for (RexNode project : projectFilter.left) {
-      newProjects.add(reMapRexNodeToNewInputs(project, mapping));
-    }
-
-    relBuilder.filter(newFilter);
-    relBuilder.project(
-        newProjects, calc.getRowType().getFieldNames(), true); // Always preserve named projects.
-
-    RelNode result = relBuilder.build();
-
-    if (newFilter.size() < projectFilter.right.size()
+    if (tableFilter.getNotSupported().size() <= projectFilter.right.size()
         || usedFields.size() < calcInputRowType.getFieldCount()) {
       // Smaller Calc programs are indisputably better, as well as IOs with less projected fields.
+      // We can consider something with the same number of filters.
       // Tell the optimizer not to use old Calc and IO.
       call.transformTo(result);
-      call.getPlanner().setImportance(calc, 0.0);
-      call.getPlanner().setImportance(ioSourceRel, 0.0);
-    } else if (newFilter.size() == projectFilter.right.size()) {
-      // But we can consider something with the same number of filters.
-      call.transformTo(result);
     }
   }
 
@@ -281,7 +228,7 @@
    * Calc should NOT be dropped in the following cases:
    * 1. Projected fields are manipulated (ex: 'select field1+10').
    * 2. When the same field projected more than once.
-   * 3. When an IO doesn't supports field reordering and projections in different (from schema) order.
+   * 3. When an IO does not supports field reordering and projects fields in a different (from schema) order.
    *
    * @param program A program to check.
    * @param projectReorderingSupported Whether project push-down supports field reordering.
@@ -304,4 +251,88 @@
 
     return true;
   }
+
+  /**
+   * Perform a series of checks to determine whether a Calc can be dropped.
+   * Following conditions need to be met in order for that to happen (logical AND):
+   * 1. Program should do simple projects, project each field once, and project fields in the same
+   *     order when field reordering is not supported.
+   * 2. Predicate can be completely pushed-down.
+   * 3. Project push-down is supported by the IO or all fields are projected by a Calc.
+   *
+   * @param program A {@code RexProgram} of a {@code Calc}.
+   * @param projectSupport An enum containing information about IO project push-down capabilities.
+   * @param tableFilter A class containing information about IO predicate push-down capabilities.
+   * @return True when Calc can be dropped, false otherwise.
+   */
+  private boolean canDropCalc(RexProgram program, ProjectSupport projectSupport, BeamSqlTableFilter tableFilter) {
+    RelDataType calcInputRowType = program.getInputRowType();
+
+    // Program should do simple projects, project each field once, and project fields in the same order when field reordering is not supported.
+    boolean fieldReorderingSupported = projectSupport.withFieldReordering();
+    if (!isProjectRenameOnlyProgram(program, fieldReorderingSupported)) {
+      return false;
+    }
+    // Predicate can be completely pushed-down
+    if (!tableFilter.getNotSupported().isEmpty()) {
+      return false;
+    }
+    // Project push-down is supported by the IO or all fields are projected by a Calc.
+    boolean isProjectSupported = projectSupport.isSupported();
+    boolean allFieldsProjected = program.getProjectList().stream()
+        .map(ref -> program.getInputRowType().getFieldList().get(ref.getIndex()).getName())
+        .collect(Collectors.toList())
+        .equals(calcInputRowType.getFieldNames());
+    return isProjectSupported || allFieldsProjected;
+  }
+
+  /**
+   * Construct a new {@link BeamIOSourceRel} with predicate and/or project pushed-down and a new {@code Calc} to do field reordering/field duplication/complex projects.
+   *
+   * @param resolved A descriptor of fields used by a {@code Calc}.
+   * @param relBuilder A {@code RelBuilder} for constructing {@code Project} and {@code Filter} Rel nodes with operations unsupported by the IO.
+   * @param ioSourceRel An original {@code BeamIOSourceRel} we are attempting to perform push-down for.
+   * @param tableFilter A class containing information about IO predicate push-down capabilities.
+   * @param calcDataType A Calcite output schema of an original {@code Calc}.
+   * @param calcProjects A list of projected {@code RexNode}s by a {@code Calc}.
+   * @return An alternative {@code RelNode} with supported filters/projects pushed-down to IO Rel.
+   */
+  private RelNode constructNodesWithPushDown(FieldAccessDescriptor resolved, RelBuilder relBuilder, BeamIOSourceRel ioSourceRel, BeamSqlTableFilter tableFilter, RelDataType calcDataType, List<RexNode> calcProjects) {
+    Schema newSchema =
+        SelectHelpers.getOutputSchema(ioSourceRel.getBeamSqlTable().getSchema(), resolved);
+    RelDataType calcInputType =
+        CalciteUtils.toCalciteRowType(newSchema, ioSourceRel.getCluster().getTypeFactory());
+
+    BeamIOSourceRel newIoSourceRel =
+        ioSourceRel.copy(calcInputType, newSchema.getFieldNames(), tableFilter);
+    relBuilder.push(newIoSourceRel);
+
+    List<RexNode> newProjects = new ArrayList<>();
+    List<RexNode> newFilter = new ArrayList<>();
+    // Ex: let's say the original fields are (number before each element is the index):
+    // {0:unused1, 1:id, 2:name, 3:unused2},
+    // where only 'id' and 'name' are being used. Then the new calcInputType should be as follows:
+    // {0:id, 1:name}.
+    // A mapping list will contain 2 entries: {0:1, 1:2},
+    // showing how used field names map to the original fields.
+    List<Integer> mapping =
+        resolved.getFieldsAccessed().stream()
+            .map(FieldDescriptor::getFieldId)
+            .collect(Collectors.toList());
+
+    // Map filters to new RexInputRef.
+    for (RexNode filter : tableFilter.getNotSupported()) {
+      newFilter.add(reMapRexNodeToNewInputs(filter, mapping));
+    }
+    // Map projects to new RexInputRef.
+    for (RexNode project : calcProjects) {
+      newProjects.add(reMapRexNodeToNewInputs(project, mapping));
+    }
+
+    relBuilder.filter(newFilter);
+    // Force to preserve named projects.
+    relBuilder.project(newProjects, calcDataType.getFieldNames(), true);
+
+    return relBuilder.build();
+  }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/ProjectSupport.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/ProjectSupport.java
index 2604d4c..732247e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/ProjectSupport.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/ProjectSupport.java
@@ -5,11 +5,11 @@
   WITHOUT_FIELD_REORDERING,
   WITH_FIELD_REORDERING;
 
-  public boolean isProjectSupported() {
+  public boolean isSupported() {
     return !this.equals(NONE);
   }
 
-  public boolean isFieldReorderingSupported() {
+  public boolean withFieldReordering() {
     return this.equals(WITH_FIELD_REORDERING);
   }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
index ce76c27..5ed9f8b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
@@ -128,16 +128,13 @@
         FieldAccessDescriptor.withFieldNames(fieldNames).resolve(getSchema());
     final Schema newSchema = SelectHelpers.getOutputSchema(getSchema(), resolved);
 
-    TypedRead<Row> builder = getBigQueryReadBuilder(newSchema);
+    TypedRead<Row> builder = getBigQueryReadBuilder(newSchema)
+        .withSelectedFields(fieldNames);
 
     if (!(filters instanceof DefaultTableFilter)) {
       throw new RuntimeException("Unimplemented at the moment.");
     }
 
-    if (!fieldNames.isEmpty()) {
-      builder = builder.withSelectedFields(fieldNames);
-    }
-
     return begin
         .apply("Read Input BQ Rows with push-down", builder);
   }