[BEAM-12609] Enable projection pushdown in SchemaIO. (#15216)

* [BEAM-12609] Enable projection pushdown in SchemaIO.

Introduce PushdownProjector interface and connect it to SchemaIOTableProviderWrapper.

* Throw errors if pushdown is unsupported.
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/PushdownProjector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/PushdownProjector.java
new file mode 100644
index 0000000..0ae7808
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/PushdownProjector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.io;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * Factory for creating a {@link PTransform} that can execute a projection.
+ *
+ * <p>Typically this interface will be implemented by a reader {@link PTransform} that is capable of
+ * pushing down projection to an external source. For example, {@link SchemaIO#buildReader()} may
+ * return a {@link PushdownProjector} to which a projection may be applied later.
+ */
+@Experimental
+public interface PushdownProjector {
+  /**
+   * Returns a {@link PTransform} that will execute the projection specified by the {@link
+   * FieldAccessDescriptor}.
+   */
+  PTransform<? extends PInput, PCollection<Row>> withProjectionPushdown(
+      FieldAccessDescriptor fieldAccessDescriptor);
+
+  /**
+   * Returns true if this instance can do a projection that returns fields in a different order than
+   * the projection's inputs.
+   */
+  boolean supportsFieldReordering();
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/DefaultTableFilter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/DefaultTableFilter.java
index 16a6906..d77e3df 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/DefaultTableFilter.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/DefaultTableFilter.java
@@ -27,7 +27,7 @@
 public final class DefaultTableFilter implements BeamSqlTableFilter {
   private final List<RexNode> filters;
 
-  DefaultTableFilter(List<RexNode> filters) {
+  public DefaultTableFilter(List<RexNode> filters) {
     this.filters = filters;
   }
 
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java
index fb3dca3..34b640e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java
@@ -22,16 +22,22 @@
 import com.alibaba.fastjson.JSONObject;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import java.io.Serializable;
+import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
 import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
 import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.schemas.io.PushdownProjector;
 import org.apache.beam.sdk.schemas.io.SchemaIO;
 import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -119,6 +125,45 @@
     }
 
     @Override
+    public PCollection<Row> buildIOReader(
+        PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
+      PTransform<PBegin, PCollection<Row>> readerTransform = schemaIO.buildReader();
+      if (!(filters instanceof DefaultTableFilter)) {
+        throw new UnsupportedOperationException(
+            String.format(
+                "Filter pushdown is not yet supported in %s. BEAM-12663",
+                SchemaIOTableWrapper.class));
+      }
+      if (!fieldNames.isEmpty()) {
+        if (readerTransform instanceof PushdownProjector) {
+          PushdownProjector pushdownProjector = (PushdownProjector) readerTransform;
+          FieldAccessDescriptor fieldAccessDescriptor =
+              FieldAccessDescriptor.withFieldNames(fieldNames);
+          // The pushdown must return a PTransform that can be applied to a PBegin, or this cast
+          // will fail.
+          readerTransform =
+              (PTransform<PBegin, PCollection<Row>>)
+                  pushdownProjector.withProjectionPushdown(fieldAccessDescriptor);
+        } else {
+          throw new UnsupportedOperationException(
+              String.format("%s does not support projection pushdown.", this.getClass()));
+        }
+      }
+      return begin.apply(readerTransform);
+    }
+
+    @Override
+    public ProjectSupport supportsProjects() {
+      PTransform<PBegin, PCollection<Row>> readerTransform = schemaIO.buildReader();
+      if (readerTransform instanceof PushdownProjector) {
+        return ((PushdownProjector) readerTransform).supportsFieldReordering()
+            ? ProjectSupport.WITH_FIELD_REORDERING
+            : ProjectSupport.WITHOUT_FIELD_REORDERING;
+      }
+      return ProjectSupport.NONE;
+    }
+
+    @Override
     public POutput buildIOWriter(PCollection<Row> input) {
       PTransform<PCollection<Row>, ? extends POutput> writerTransform = schemaIO.buildWriter();
       return input.apply(writerTransform);
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapperTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapperTest.java
new file mode 100644
index 0000000..4b45e5a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapperTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import com.alibaba.fastjson.JSON;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests {@link org.apache.beam.sdk.extensions.sql.meta.provider.SchemaIOTableProviderWrapper} using
+ * {@link org.apache.beam.sdk.extensions.sql.meta.provider.TestSchemaIOTableProviderWrapper}.
+ */
+@RunWith(JUnit4.class)
+public class SchemaIOTableProviderWrapperTest {
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  private static final Schema inputSchema =
+      Schema.builder()
+          .addStringField("f_string")
+          .addInt64Field("f_long")
+          .addBooleanField("f_bool")
+          .build();
+  private static final List<Row> rows =
+      ImmutableList.of(
+          Row.withSchema(inputSchema).addValues("zero", 0L, false).build(),
+          Row.withSchema(inputSchema).addValues("one", 1L, true).build());
+  private final Table testTable =
+      Table.builder()
+          .name("table")
+          .comment("table")
+          .schema(inputSchema)
+          .properties(JSON.parseObject("{}"))
+          .type("test")
+          .build();
+
+  @BeforeClass
+  public static void setUp() {
+    TestSchemaIOTableProviderWrapper.addRows(rows.stream().toArray(Row[]::new));
+  }
+
+  @Test
+  public void testBuildIOReader() {
+    TestSchemaIOTableProviderWrapper provider = new TestSchemaIOTableProviderWrapper();
+    BeamSqlTable beamSqlTable = provider.buildBeamSqlTable(testTable);
+
+    PCollection<Row> result = beamSqlTable.buildIOReader(pipeline.begin());
+    PAssert.that(result).containsInAnyOrder(rows);
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testBuildIOReader_withProjectionPushdown() {
+    TestSchemaIOTableProviderWrapper provider = new TestSchemaIOTableProviderWrapper();
+    BeamSqlTable beamSqlTable = provider.buildBeamSqlTable(testTable);
+
+    PCollection<Row> result =
+        beamSqlTable.buildIOReader(
+            pipeline.begin(),
+            new DefaultTableFilter(ImmutableList.of()),
+            ImmutableList.of("f_long"));
+    Schema outputSchema = Schema.builder().addInt64Field("f_long").build();
+    PAssert.that(result)
+        .containsInAnyOrder(
+            Row.withSchema(outputSchema).addValues(0L).build(),
+            Row.withSchema(outputSchema).addValues(1L).build());
+
+    pipeline.run();
+  }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/TestSchemaIOTableProviderWrapper.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/TestSchemaIOTableProviderWrapper.java
new file mode 100644
index 0000000..835fc5b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/TestSchemaIOTableProviderWrapper.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.PushdownProjector;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.schemas.transforms.Select;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A mock {@link org.apache.beam.sdk.extensions.sql.meta.provider.SchemaIOTableProviderWrapper} that
+ * reads in-memory data for testing.
+ */
+@VisibleForTesting
+public class TestSchemaIOTableProviderWrapper extends SchemaIOTableProviderWrapper {
+  private static final List<Row> rows = new ArrayList<>();
+
+  @Override
+  public SchemaIOProvider getSchemaIOProvider() {
+    return new TestSchemaIOProvider();
+  }
+
+  public static void addRows(Row... newRows) {
+    rows.addAll(Arrays.asList(newRows));
+  }
+
+  private class TestSchemaIOProvider implements SchemaIOProvider {
+    @Override
+    public String identifier() {
+      return "TestSchemaIOProvider";
+    }
+
+    @Override
+    public Schema configurationSchema() {
+      return Schema.of();
+    }
+
+    @Override
+    public SchemaIO from(String location, Row configuration, @Nullable Schema dataSchema) {
+      return new TestSchemaIO(dataSchema);
+    }
+
+    @Override
+    public boolean requiresDataSchema() {
+      return true;
+    }
+
+    @Override
+    public PCollection.IsBounded isBounded() {
+      return PCollection.IsBounded.BOUNDED;
+    }
+  }
+
+  private class TestSchemaIO implements SchemaIO {
+    private final Schema schema;
+
+    TestSchemaIO(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public Schema schema() {
+      return schema;
+    }
+
+    @Override
+    public PTransform<PBegin, PCollection<Row>> buildReader() {
+      // Read all fields by default.
+      return new TestPushdownProjector(schema, FieldAccessDescriptor.withAllFields());
+    }
+
+    @Override
+    public PTransform<PCollection<Row>, ? extends POutput> buildWriter() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  /**
+   * {@link PTransform} that reads in-memory data for testing. Simulates projection pushdown using
+   * {@link Select}.
+   */
+  private class TestPushdownProjector extends PTransform<PBegin, PCollection<Row>>
+      implements PushdownProjector {
+    /** The schema of the input data. */
+    private final Schema schema;
+    /** The fields to be projected. */
+    private final FieldAccessDescriptor fieldAccessDescriptor;
+
+    TestPushdownProjector(Schema schema, FieldAccessDescriptor fieldAccessDescriptor) {
+      this.schema = schema;
+      this.fieldAccessDescriptor = fieldAccessDescriptor;
+    }
+
+    @Override
+    public PTransform<? extends PInput, PCollection<Row>> withProjectionPushdown(
+        FieldAccessDescriptor fieldAccessDescriptor) {
+      return new TestPushdownProjector(schema, fieldAccessDescriptor);
+    }
+
+    @Override
+    public boolean supportsFieldReordering() {
+      return true;
+    }
+
+    @Override
+    public PCollection<Row> expand(PBegin input) {
+      // Simulate projection pushdown using Select. In a real IO, projection would be pushed down to
+      // the source.
+      return input
+          .apply(Create.of(rows).withRowSchema(schema))
+          .apply(Select.fieldAccess(fieldAccessDescriptor));
+    }
+  }
+}