Merge pull request #9786 from davidcavazos/remove-old-element-wise-snippets

[BEAM-7389] Remove old element-wise snippets directory
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
index b9d7bd7..e65d903 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
@@ -32,12 +32,15 @@
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
 import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.transforms.Select;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -151,12 +154,37 @@
     }
 
     @Override
+    public PCollection<Row> buildIOReader(
+        PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
+      PCollection<Row> withAllFields = buildIOReader(begin);
+      if (fieldNames.isEmpty() && filters instanceof DefaultTableFilter) {
+        return withAllFields;
+      }
+
+      PCollection<Row> result = withAllFields;
+      if (!(filters instanceof DefaultTableFilter)) {
+        throw new RuntimeException("Unimplemented at the moment.");
+      }
+
+      if (!fieldNames.isEmpty()) {
+        result = result.apply(Select.fieldNames(fieldNames.toArray(new String[0])));
+      }
+
+      return result;
+    }
+
+    @Override
     public POutput buildIOWriter(PCollection<Row> input) {
       input.apply(ParDo.of(new CollectorFn(tableWithRows)));
       return PDone.in(input.getPipeline());
     }
 
     @Override
+    public boolean supportsProjects() {
+      return true;
+    }
+
+    @Override
     public Schema getSchema() {
       return tableWithRows.table.getSchema();
     }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderTest.java
new file mode 100644
index 0000000..c65d593
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.test;
+
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class TestTableProviderTest {
+  private static final Schema BASIC_SCHEMA =
+      Schema.builder().addInt32Field("id").addStringField("name").build();
+  private BeamSqlTable beamSqlTable;
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  @Before
+  public void buildUp() {
+    TestTableProvider tableProvider = new TestTableProvider();
+    Table table = getTable("tableName");
+    tableProvider.createTable(table);
+    tableProvider.addRows(
+        table.getName(), row(BASIC_SCHEMA, 1, "one"), row(BASIC_SCHEMA, 2, "two"));
+
+    beamSqlTable = tableProvider.buildBeamSqlTable(table);
+  }
+
+  @Test
+  public void testInMemoryTableProvider_returnsSelectedColumns() {
+    PCollection<Row> result =
+        beamSqlTable.buildIOReader(
+            pipeline.begin(),
+            beamSqlTable.constructFilter(ImmutableList.of()),
+            ImmutableList.of("name"));
+
+    PAssert.that(result)
+        .containsInAnyOrder(row(result.getSchema(), "one"), row(result.getSchema(), "two"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testInMemoryTableProvider_withEmptySelectedColumns_returnsAllColumns() {
+    PCollection<Row> result =
+        beamSqlTable.buildIOReader(
+            pipeline.begin(), beamSqlTable.constructFilter(ImmutableList.of()), ImmutableList.of());
+
+    PAssert.that(result)
+        .containsInAnyOrder(row(result.getSchema(), 1, "one"), row(result.getSchema(), 2, "two"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testInMemoryTableProvider_withAllSelectedColumns_returnsAllColumns() {
+    PCollection<Row> result =
+        beamSqlTable.buildIOReader(
+            pipeline.begin(),
+            beamSqlTable.constructFilter(ImmutableList.of()),
+            ImmutableList.of("name", "id")); // Note that order is ignored
+
+    // Selected columns are outputted in the same order they are listed in the schema.
+    PAssert.that(result)
+        .containsInAnyOrder(row(result.getSchema(), 1, "one"), row(result.getSchema(), 2, "two"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testInMemoryTableProvider_withDuplicateSelectedColumns_returnsSelectedColumnsOnce() {
+    PCollection<Row> result =
+        beamSqlTable.buildIOReader(
+            pipeline.begin(),
+            beamSqlTable.constructFilter(ImmutableList.of()),
+            ImmutableList.of("name", "name"));
+
+    PAssert.that(result)
+        .containsInAnyOrder(row(result.getSchema(), "one"), row(result.getSchema(), "two"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  private static Row row(Schema schema, Object... objects) {
+    return Row.withSchema(schema).addValues(objects).build();
+  }
+
+  private static Table getTable(String name) {
+    return Table.builder()
+        .name(name)
+        .comment(name + " table")
+        .schema(BASIC_SCHEMA)
+        .type("test")
+        .build();
+  }
+}
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 3e42485..4548488 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -119,7 +119,10 @@
     'oauth2client>=2.0.1,<4',
     'protobuf>=3.5.0.post1,<4',
     # [BEAM-6287] pyarrow is not supported on Windows for Python 2
-    ('pyarrow>=0.11.1,<0.15.0; python_version >= "3.0" or '
+    # [BEAM-8392] pyarrow 0.14.0 and 0.15.0 triggers an exception when importing
+    # apache_beam on macOS 10.15. Update version bounds as soon as the fix is
+    # ready
+    ('pyarrow>=0.11.1,<0.14.0; python_version >= "3.0" or '
      'platform_system != "Windows"'),
     'pydot>=1.2.0,<2',
     'python-dateutil>=2.8.0,<3',