[BEAM-12609] Enable projection pushdown in SchemaIO. (#15216)
* [BEAM-12609] Enable projection pushdown in SchemaIO.
Introduce PushdownProjector interface and connect it to SchemaIOTableProviderWrapper.
* Throw errors if pushdown is unsupported.
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/PushdownProjector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/PushdownProjector.java
new file mode 100644
index 0000000..0ae7808
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/PushdownProjector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.io;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * Factory for creating a {@link PTransform} that can execute a projection.
+ *
+ * <p>Typically this interface will be implemented by a reader {@link PTransform} that is capable of
+ * pushing down projection to an external source. For example, {@link SchemaIO#buildReader()} may
+ * return a {@link PushdownProjector} to which a projection may be applied later.
+ */
+@Experimental
+public interface PushdownProjector {
+ /**
+ * Returns a {@link PTransform} that will execute the projection specified by the {@link
+ * FieldAccessDescriptor}.
+ */
+ PTransform<? extends PInput, PCollection<Row>> withProjectionPushdown(
+ FieldAccessDescriptor fieldAccessDescriptor);
+
+ /**
+ * Returns true if this instance can do a projection that returns fields in a different order than
+ * the projection's inputs.
+ */
+ boolean supportsFieldReordering();
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/DefaultTableFilter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/DefaultTableFilter.java
index 16a6906..d77e3df 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/DefaultTableFilter.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/DefaultTableFilter.java
@@ -27,7 +27,7 @@
public final class DefaultTableFilter implements BeamSqlTableFilter {
private final List<RexNode> filters;
- DefaultTableFilter(List<RexNode> filters) {
+ public DefaultTableFilter(List<RexNode> filters) {
this.filters = filters;
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java
index fb3dca3..34b640e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java
@@ -22,16 +22,22 @@
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.Serializable;
+import java.util.List;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.schemas.io.PushdownProjector;
import org.apache.beam.sdk.schemas.io.SchemaIO;
import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
import org.apache.beam.sdk.transforms.PTransform;
@@ -119,6 +125,45 @@
}
@Override
+ public PCollection<Row> buildIOReader(
+ PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
+ PTransform<PBegin, PCollection<Row>> readerTransform = schemaIO.buildReader();
+ if (!(filters instanceof DefaultTableFilter)) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Filter pushdown is not yet supported in %s. BEAM-12663",
+ SchemaIOTableWrapper.class));
+ }
+ if (!fieldNames.isEmpty()) {
+ if (readerTransform instanceof PushdownProjector) {
+ PushdownProjector pushdownProjector = (PushdownProjector) readerTransform;
+ FieldAccessDescriptor fieldAccessDescriptor =
+ FieldAccessDescriptor.withFieldNames(fieldNames);
+ // The pushdown must return a PTransform that can be applied to a PBegin, or this cast
+ // will fail.
+ readerTransform =
+ (PTransform<PBegin, PCollection<Row>>)
+ pushdownProjector.withProjectionPushdown(fieldAccessDescriptor);
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("%s does not support projection pushdown.", this.getClass()));
+ }
+ }
+ return begin.apply(readerTransform);
+ }
+
+ @Override
+ public ProjectSupport supportsProjects() {
+ PTransform<PBegin, PCollection<Row>> readerTransform = schemaIO.buildReader();
+ if (readerTransform instanceof PushdownProjector) {
+ return ((PushdownProjector) readerTransform).supportsFieldReordering()
+ ? ProjectSupport.WITH_FIELD_REORDERING
+ : ProjectSupport.WITHOUT_FIELD_REORDERING;
+ }
+ return ProjectSupport.NONE;
+ }
+
+ @Override
public POutput buildIOWriter(PCollection<Row> input) {
PTransform<PCollection<Row>, ? extends POutput> writerTransform = schemaIO.buildWriter();
return input.apply(writerTransform);
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapperTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapperTest.java
new file mode 100644
index 0000000..4b45e5a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapperTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import com.alibaba.fastjson.JSON;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests {@link org.apache.beam.sdk.extensions.sql.meta.provider.SchemaIOTableProviderWrapper} using
+ * {@link org.apache.beam.sdk.extensions.sql.meta.provider.TestSchemaIOTableProviderWrapper}.
+ */
+@RunWith(JUnit4.class)
+public class SchemaIOTableProviderWrapperTest {
+ @Rule public TestPipeline pipeline = TestPipeline.create();
+
+ private static final Schema inputSchema =
+ Schema.builder()
+ .addStringField("f_string")
+ .addInt64Field("f_long")
+ .addBooleanField("f_bool")
+ .build();
+ private static final List<Row> rows =
+ ImmutableList.of(
+ Row.withSchema(inputSchema).addValues("zero", 0L, false).build(),
+ Row.withSchema(inputSchema).addValues("one", 1L, true).build());
+ private final Table testTable =
+ Table.builder()
+ .name("table")
+ .comment("table")
+ .schema(inputSchema)
+ .properties(JSON.parseObject("{}"))
+ .type("test")
+ .build();
+
+ @BeforeClass
+ public static void setUp() {
+ TestSchemaIOTableProviderWrapper.addRows(rows.stream().toArray(Row[]::new));
+ }
+
+ @Test
+ public void testBuildIOReader() {
+ TestSchemaIOTableProviderWrapper provider = new TestSchemaIOTableProviderWrapper();
+ BeamSqlTable beamSqlTable = provider.buildBeamSqlTable(testTable);
+
+ PCollection<Row> result = beamSqlTable.buildIOReader(pipeline.begin());
+ PAssert.that(result).containsInAnyOrder(rows);
+
+ pipeline.run();
+ }
+
+ @Test
+ public void testBuildIOReader_withProjectionPushdown() {
+ TestSchemaIOTableProviderWrapper provider = new TestSchemaIOTableProviderWrapper();
+ BeamSqlTable beamSqlTable = provider.buildBeamSqlTable(testTable);
+
+ PCollection<Row> result =
+ beamSqlTable.buildIOReader(
+ pipeline.begin(),
+ new DefaultTableFilter(ImmutableList.of()),
+ ImmutableList.of("f_long"));
+ Schema outputSchema = Schema.builder().addInt64Field("f_long").build();
+ PAssert.that(result)
+ .containsInAnyOrder(
+ Row.withSchema(outputSchema).addValues(0L).build(),
+ Row.withSchema(outputSchema).addValues(1L).build());
+
+ pipeline.run();
+ }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/TestSchemaIOTableProviderWrapper.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/TestSchemaIOTableProviderWrapper.java
new file mode 100644
index 0000000..835fc5b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/TestSchemaIOTableProviderWrapper.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.PushdownProjector;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.schemas.transforms.Select;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A mock {@link org.apache.beam.sdk.extensions.sql.meta.provider.SchemaIOTableProviderWrapper} that
+ * reads in-memory data for testing.
+ */
+@VisibleForTesting
+public class TestSchemaIOTableProviderWrapper extends SchemaIOTableProviderWrapper {
+ private static final List<Row> rows = new ArrayList<>();
+
+ @Override
+ public SchemaIOProvider getSchemaIOProvider() {
+ return new TestSchemaIOProvider();
+ }
+
+ public static void addRows(Row... newRows) {
+ rows.addAll(Arrays.asList(newRows));
+ }
+
+ private class TestSchemaIOProvider implements SchemaIOProvider {
+ @Override
+ public String identifier() {
+ return "TestSchemaIOProvider";
+ }
+
+ @Override
+ public Schema configurationSchema() {
+ return Schema.of();
+ }
+
+ @Override
+ public SchemaIO from(String location, Row configuration, @Nullable Schema dataSchema) {
+ return new TestSchemaIO(dataSchema);
+ }
+
+ @Override
+ public boolean requiresDataSchema() {
+ return true;
+ }
+
+ @Override
+ public PCollection.IsBounded isBounded() {
+ return PCollection.IsBounded.BOUNDED;
+ }
+ }
+
+ private class TestSchemaIO implements SchemaIO {
+ private final Schema schema;
+
+ TestSchemaIO(Schema schema) {
+ this.schema = schema;
+ }
+
+ @Override
+ public Schema schema() {
+ return schema;
+ }
+
+ @Override
+ public PTransform<PBegin, PCollection<Row>> buildReader() {
+ // Read all fields by default.
+ return new TestPushdownProjector(schema, FieldAccessDescriptor.withAllFields());
+ }
+
+ @Override
+ public PTransform<PCollection<Row>, ? extends POutput> buildWriter() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * {@link PTransform} that reads in-memory data for testing. Simulates projection pushdown using
+ * {@link Select}.
+ */
+ private class TestPushdownProjector extends PTransform<PBegin, PCollection<Row>>
+ implements PushdownProjector {
+ /** The schema of the input data. */
+ private final Schema schema;
+ /** The fields to be projected. */
+ private final FieldAccessDescriptor fieldAccessDescriptor;
+
+ TestPushdownProjector(Schema schema, FieldAccessDescriptor fieldAccessDescriptor) {
+ this.schema = schema;
+ this.fieldAccessDescriptor = fieldAccessDescriptor;
+ }
+
+ @Override
+ public PTransform<? extends PInput, PCollection<Row>> withProjectionPushdown(
+ FieldAccessDescriptor fieldAccessDescriptor) {
+ return new TestPushdownProjector(schema, fieldAccessDescriptor);
+ }
+
+ @Override
+ public boolean supportsFieldReordering() {
+ return true;
+ }
+
+ @Override
+ public PCollection<Row> expand(PBegin input) {
+ // Simulate projection pushdown using Select. In a real IO, projection would be pushed down to
+ // the source.
+ return input
+ .apply(Create.of(rows).withRowSchema(schema))
+ .apply(Select.fieldAccess(fieldAccessDescriptor));
+ }
+ }
+}