Merge pull request #9786 from davidcavazos/remove-old-element-wise-snippets
[BEAM-7389] Remove old element-wise snippets directory
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
index b9d7bd7..e65d903 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
@@ -32,12 +32,15 @@
import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.transforms.Select;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
@@ -151,12 +154,37 @@
}
@Override
+ public PCollection<Row> buildIOReader(
+ PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
+ PCollection<Row> withAllFields = buildIOReader(begin);
+ if (fieldNames.isEmpty() && filters instanceof DefaultTableFilter) {
+ return withAllFields;
+ }
+
+ PCollection<Row> result = withAllFields;
+ if (!(filters instanceof DefaultTableFilter)) {
+ throw new RuntimeException("Unimplemented at the moment.");
+ }
+
+ if (!fieldNames.isEmpty()) {
+ result = result.apply(Select.fieldNames(fieldNames.toArray(new String[0])));
+ }
+
+ return result;
+ }
+
+ @Override
public POutput buildIOWriter(PCollection<Row> input) {
input.apply(ParDo.of(new CollectorFn(tableWithRows)));
return PDone.in(input.getPipeline());
}
@Override
+ public boolean supportsProjects() {
+ return true;
+ }
+
+ @Override
public Schema getSchema() {
return tableWithRows.table.getSchema();
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderTest.java
new file mode 100644
index 0000000..c65d593
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.test;
+
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class TestTableProviderTest {
+ private static final Schema BASIC_SCHEMA =
+ Schema.builder().addInt32Field("id").addStringField("name").build();
+ private BeamSqlTable beamSqlTable;
+
+ @Rule public TestPipeline pipeline = TestPipeline.create();
+
+ @Before
+ public void buildUp() {
+ TestTableProvider tableProvider = new TestTableProvider();
+ Table table = getTable("tableName");
+ tableProvider.createTable(table);
+ tableProvider.addRows(
+ table.getName(), row(BASIC_SCHEMA, 1, "one"), row(BASIC_SCHEMA, 2, "two"));
+
+ beamSqlTable = tableProvider.buildBeamSqlTable(table);
+ }
+
+ @Test
+ public void testInMemoryTableProvider_returnsSelectedColumns() {
+ PCollection<Row> result =
+ beamSqlTable.buildIOReader(
+ pipeline.begin(),
+ beamSqlTable.constructFilter(ImmutableList.of()),
+ ImmutableList.of("name"));
+
+ PAssert.that(result)
+ .containsInAnyOrder(row(result.getSchema(), "one"), row(result.getSchema(), "two"));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testInMemoryTableProvider_withEmptySelectedColumns_returnsAllColumns() {
+ PCollection<Row> result =
+ beamSqlTable.buildIOReader(
+ pipeline.begin(), beamSqlTable.constructFilter(ImmutableList.of()), ImmutableList.of());
+
+ PAssert.that(result)
+ .containsInAnyOrder(row(result.getSchema(), 1, "one"), row(result.getSchema(), 2, "two"));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testInMemoryTableProvider_withAllSelectedColumns_returnsAllColumns() {
+ PCollection<Row> result =
+ beamSqlTable.buildIOReader(
+ pipeline.begin(),
+ beamSqlTable.constructFilter(ImmutableList.of()),
+ ImmutableList.of("name", "id")); // Note that order is ignored
+
+ // Selected columns are outputted in the same order they are listed in the schema.
+ PAssert.that(result)
+ .containsInAnyOrder(row(result.getSchema(), 1, "one"), row(result.getSchema(), 2, "two"));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testInMemoryTableProvider_withDuplicateSelectedColumns_returnsSelectedColumnsOnce() {
+ PCollection<Row> result =
+ beamSqlTable.buildIOReader(
+ pipeline.begin(),
+ beamSqlTable.constructFilter(ImmutableList.of()),
+ ImmutableList.of("name", "name"));
+
+ PAssert.that(result)
+ .containsInAnyOrder(row(result.getSchema(), "one"), row(result.getSchema(), "two"));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ private static Row row(Schema schema, Object... objects) {
+ return Row.withSchema(schema).addValues(objects).build();
+ }
+
+ private static Table getTable(String name) {
+ return Table.builder()
+ .name(name)
+ .comment(name + " table")
+ .schema(BASIC_SCHEMA)
+ .type("test")
+ .build();
+ }
+}
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 3e42485..4548488 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -119,7 +119,10 @@
'oauth2client>=2.0.1,<4',
'protobuf>=3.5.0.post1,<4',
# [BEAM-6287] pyarrow is not supported on Windows for Python 2
- ('pyarrow>=0.11.1,<0.15.0; python_version >= "3.0" or '
+ # [BEAM-8392] pyarrow 0.14.0 and 0.15.0 triggers an exception when importing
+ # apache_beam on macOS 10.15. Update version bounds as soon as the fix is
+ # ready
+ ('pyarrow>=0.11.1,<0.14.0; python_version >= "3.0" or '
'platform_system != "Windows"'),
'pydot>=1.2.0,<2',
'python-dateutil>=2.8.0,<3',