Merge pull request #9773 from RyanSkraba/BEAM-7073-avro-sql-unit-test
[BEAM-7073]: Add unit test for Avro logical type datum.
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
index 24edb28..b6e040c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
@@ -70,9 +70,6 @@
}
PortablePipelineOptions portableOptions = flinkOptions.as(PortablePipelineOptions.class);
- if (portableOptions.getSdkWorkerParallelism() == 0L) {
- portableOptions.setSdkWorkerParallelism(serverConfig.getSdkWorkerParallelism());
- }
PortablePipelineRunner pipelineRunner;
if (portableOptions.getOutputExecutablePath() == null
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
index a90f7e6..1e345d0 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
@@ -43,7 +43,6 @@
assertThat(config.getArtifactPort(), is(8098));
assertThat(config.getExpansionPort(), is(8097));
assertThat(config.getFlinkMasterUrl(), is("[auto]"));
- assertThat(config.getSdkWorkerParallelism(), is(1L));
assertThat(config.isCleanArtifactsPerJob(), is(true));
FlinkJobServerDriver flinkJobServerDriver = FlinkJobServerDriver.fromConfig(config);
assertThat(flinkJobServerDriver, is(not(nullValue())));
@@ -62,7 +61,6 @@
"--expansion-port",
"44",
"--flink-master-url=jobmanager",
- "--sdk-worker-parallelism=4",
"--clean-artifacts-per-job=false",
});
FlinkJobServerDriver.FlinkServerConfiguration config =
@@ -72,7 +70,6 @@
assertThat(config.getArtifactPort(), is(43));
assertThat(config.getExpansionPort(), is(44));
assertThat(config.getFlinkMasterUrl(), is("jobmanager"));
- assertThat(config.getSdkWorkerParallelism(), is(4L));
assertThat(config.isCleanArtifactsPerJob(), is(false));
}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
index f8977ff..0c5bf94 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
@@ -94,15 +94,6 @@
handler = ExplicitBooleanOptionHandler.class)
private boolean cleanArtifactsPerJob = true;
- @Option(
- name = "--sdk-worker-parallelism",
- usage =
- "Default parallelism for SDK worker processes. This option is only applied when the "
- + "pipeline option sdkWorkerParallelism is set to 0."
- + "Default is 1, If 0, worker parallelism will be dynamically decided by runner."
- + "See also: sdkWorkerParallelism Pipeline Option")
- private long sdkWorkerParallelism = 1L;
-
public String getHost() {
return host;
}
@@ -126,10 +117,6 @@
public boolean isCleanArtifactsPerJob() {
return cleanArtifactsPerJob;
}
-
- public long getSdkWorkerParallelism() {
- return this.sdkWorkerParallelism;
- }
}
protected static ServerFactory createJobServerFactory(ServerConfiguration configuration) {
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
index b9d7bd7..e65d903 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
@@ -32,12 +32,15 @@
import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.transforms.Select;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
@@ -151,12 +154,37 @@
}
@Override
+ public PCollection<Row> buildIOReader(
+ PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
+ PCollection<Row> withAllFields = buildIOReader(begin);
+ if (fieldNames.isEmpty() && filters instanceof DefaultTableFilter) {
+ return withAllFields;
+ }
+
+ PCollection<Row> result = withAllFields;
+ if (!(filters instanceof DefaultTableFilter)) {
+ throw new RuntimeException("Unimplemented at the moment.");
+ }
+
+ if (!fieldNames.isEmpty()) {
+ result = result.apply(Select.fieldNames(fieldNames.toArray(new String[0])));
+ }
+
+ return result;
+ }
+
+ @Override
public POutput buildIOWriter(PCollection<Row> input) {
input.apply(ParDo.of(new CollectorFn(tableWithRows)));
return PDone.in(input.getPipeline());
}
@Override
+ public boolean supportsProjects() {
+ return true;
+ }
+
+ @Override
public Schema getSchema() {
return tableWithRows.table.getSchema();
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderTest.java
new file mode 100644
index 0000000..c65d593
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.test;
+
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class TestTableProviderTest {
+ private static final Schema BASIC_SCHEMA =
+ Schema.builder().addInt32Field("id").addStringField("name").build();
+ private BeamSqlTable beamSqlTable;
+
+ @Rule public TestPipeline pipeline = TestPipeline.create();
+
+ @Before
+ public void buildUp() {
+ TestTableProvider tableProvider = new TestTableProvider();
+ Table table = getTable("tableName");
+ tableProvider.createTable(table);
+ tableProvider.addRows(
+ table.getName(), row(BASIC_SCHEMA, 1, "one"), row(BASIC_SCHEMA, 2, "two"));
+
+ beamSqlTable = tableProvider.buildBeamSqlTable(table);
+ }
+
+ @Test
+ public void testInMemoryTableProvider_returnsSelectedColumns() {
+ PCollection<Row> result =
+ beamSqlTable.buildIOReader(
+ pipeline.begin(),
+ beamSqlTable.constructFilter(ImmutableList.of()),
+ ImmutableList.of("name"));
+
+ PAssert.that(result)
+ .containsInAnyOrder(row(result.getSchema(), "one"), row(result.getSchema(), "two"));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testInMemoryTableProvider_withEmptySelectedColumns_returnsAllColumns() {
+ PCollection<Row> result =
+ beamSqlTable.buildIOReader(
+ pipeline.begin(), beamSqlTable.constructFilter(ImmutableList.of()), ImmutableList.of());
+
+ PAssert.that(result)
+ .containsInAnyOrder(row(result.getSchema(), 1, "one"), row(result.getSchema(), 2, "two"));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testInMemoryTableProvider_withAllSelectedColumns_returnsAllColumns() {
+ PCollection<Row> result =
+ beamSqlTable.buildIOReader(
+ pipeline.begin(),
+ beamSqlTable.constructFilter(ImmutableList.of()),
+ ImmutableList.of("name", "id")); // Note that order is ignored
+
+ // Selected columns are outputted in the same order they are listed in the schema.
+ PAssert.that(result)
+ .containsInAnyOrder(row(result.getSchema(), 1, "one"), row(result.getSchema(), 2, "two"));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testInMemoryTableProvider_withDuplicateSelectedColumns_returnsSelectedColumnsOnce() {
+ PCollection<Row> result =
+ beamSqlTable.buildIOReader(
+ pipeline.begin(),
+ beamSqlTable.constructFilter(ImmutableList.of()),
+ ImmutableList.of("name", "name"));
+
+ PAssert.that(result)
+ .containsInAnyOrder(row(result.getSchema(), "one"), row(result.getSchema(), "two"));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ private static Row row(Schema schema, Object... objects) {
+ return Row.withSchema(schema).addValues(objects).build();
+ }
+
+ private static Table getTable(String name) {
+ return Table.builder()
+ .name(name)
+ .comment(name + " table")
+ .schema(BASIC_SCHEMA)
+ .type("test")
+ .build();
+ }
+}
diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java
index 79f9db0..30add9b 100644
--- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java
+++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java
@@ -36,7 +36,9 @@
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.core.type.WritableTypeId;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
@@ -156,8 +158,9 @@
SerializerProvider serializers,
TypeSerializer typeSerializer)
throws IOException {
- typeSerializer.writeTypePrefixForObject(credentialsProvider, jsonGenerator);
-
+ WritableTypeId typeId =
+ typeSerializer.writeTypePrefix(
+ jsonGenerator, typeSerializer.typeId(credentialsProvider, JsonToken.START_OBJECT));
if (credentialsProvider.getClass().equals(AWSStaticCredentialsProvider.class)) {
jsonGenerator.writeStringField(
AWS_ACCESS_KEY_ID, credentialsProvider.getCredentials().getAWSAccessKeyId());
@@ -197,7 +200,7 @@
throw new IllegalArgumentException(
"Unsupported AWS credentials provider type " + credentialsProvider.getClass());
}
- typeSerializer.writeTypeSuffixForObject(credentialsProvider, jsonGenerator);
+ typeSerializer.writeTypeSuffix(jsonGenerator, typeId);
}
}
diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsModule.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsModule.java
index 4adad67..79e8d15 100644
--- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsModule.java
+++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsModule.java
@@ -20,7 +20,9 @@
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.core.type.WritableTypeId;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
@@ -139,7 +141,9 @@
SerializerProvider serializer,
TypeSerializer typeSerializer)
throws IOException {
- typeSerializer.writeTypePrefixForObject(credentialsProvider, jsonGenerator);
+ WritableTypeId typeId =
+ typeSerializer.writeTypePrefix(
+ jsonGenerator, typeSerializer.typeId(credentialsProvider, JsonToken.START_OBJECT));
if (credentialsProvider.getClass().equals(StaticCredentialsProvider.class)) {
jsonGenerator.writeStringField(
ACCESS_KEY_ID, credentialsProvider.resolveCredentials().accessKeyId());
@@ -149,7 +153,7 @@
throw new IllegalArgumentException(
"Unsupported AWS credentials provider type " + credentialsProvider.getClass());
}
- typeSerializer.writeTypeSuffixForObject(credentialsProvider, jsonGenerator);
+ typeSerializer.writeTypeSuffix(jsonGenerator, typeId);
}
}
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/__init__.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/__init__.py
deleted file mode 100644
index 6569e3f..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/__init__.py
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter.py
deleted file mode 100644
index 44b11b8..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter.py
+++ /dev/null
@@ -1,182 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def filter_function(test=None):
- # [START filter_function]
- import apache_beam as beam
-
- def is_perennial(plant):
- return plant['duration'] == 'perennial'
-
- with beam.Pipeline() as pipeline:
- perennials = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ])
- | 'Filter perennials' >> beam.Filter(is_perennial)
- | beam.Map(print)
- )
- # [END filter_function]
- if test:
- test(perennials)
-
-
-def filter_lambda(test=None):
- # [START filter_lambda]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- perennials = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ])
- | 'Filter perennials' >> beam.Filter(
- lambda plant: plant['duration'] == 'perennial')
- | beam.Map(print)
- )
- # [END filter_lambda]
- if test:
- test(perennials)
-
-
-def filter_multiple_arguments(test=None):
- # [START filter_multiple_arguments]
- import apache_beam as beam
-
- def has_duration(plant, duration):
- return plant['duration'] == duration
-
- with beam.Pipeline() as pipeline:
- perennials = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ])
- | 'Filter perennials' >> beam.Filter(has_duration, 'perennial')
- | beam.Map(print)
- )
- # [END filter_multiple_arguments]
- if test:
- test(perennials)
-
-
-def filter_side_inputs_singleton(test=None):
- # [START filter_side_inputs_singleton]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- perennial = pipeline | 'Perennial' >> beam.Create(['perennial'])
-
- perennials = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ])
- | 'Filter perennials' >> beam.Filter(
- lambda plant, duration: plant['duration'] == duration,
- duration=beam.pvalue.AsSingleton(perennial),
- )
- | beam.Map(print)
- )
- # [END filter_side_inputs_singleton]
- if test:
- test(perennials)
-
-
-def filter_side_inputs_iter(test=None):
- # [START filter_side_inputs_iter]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- valid_durations = pipeline | 'Valid durations' >> beam.Create([
- 'annual',
- 'biennial',
- 'perennial',
- ])
-
- valid_plants = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'PERENNIAL'},
- ])
- | 'Filter valid plants' >> beam.Filter(
- lambda plant, valid_durations: plant['duration'] in valid_durations,
- valid_durations=beam.pvalue.AsIter(valid_durations),
- )
- | beam.Map(print)
- )
- # [END filter_side_inputs_iter]
- if test:
- test(valid_plants)
-
-
-def filter_side_inputs_dict(test=None):
- # [START filter_side_inputs_dict]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- keep_duration = pipeline | 'Duration filters' >> beam.Create([
- ('annual', False),
- ('biennial', False),
- ('perennial', True),
- ])
-
- perennials = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ])
- | 'Filter plants by duration' >> beam.Filter(
- lambda plant, keep_duration: keep_duration[plant['duration']],
- keep_duration=beam.pvalue.AsDict(keep_duration),
- )
- | beam.Map(print)
- )
- # [END filter_side_inputs_dict]
- if test:
- test(perennials)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter_test.py
deleted file mode 100644
index 02da146..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter_test.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.examples.snippets.transforms.element_wise.filter import *
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.filter.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class FilterTest(unittest.TestCase):
- def __init__(self, methodName):
- super(FilterTest, self).__init__(methodName)
- # [START perennials]
- perennials = [
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ]
- # [END perennials]
- self.perennials_test = lambda actual: \
- assert_that(actual, equal_to(perennials))
-
- # [START valid_plants]
- valid_plants = [
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- ]
- # [END valid_plants]
- self.valid_plants_test = lambda actual: \
- assert_that(actual, equal_to(valid_plants))
-
- def test_filter_function(self):
- filter_function(self.perennials_test)
-
- def test_filter_lambda(self):
- filter_lambda(self.perennials_test)
-
- def test_filter_multiple_arguments(self):
- filter_multiple_arguments(self.perennials_test)
-
- def test_filter_side_inputs_singleton(self):
- filter_side_inputs_singleton(self.perennials_test)
-
- def test_filter_side_inputs_iter(self):
- filter_side_inputs_iter(self.valid_plants_test)
-
- def test_filter_side_inputs_dict(self):
- filter_side_inputs_dict(self.perennials_test)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/keys.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/keys.py
deleted file mode 100644
index 01c9d6b..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/keys.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def keys(test=None):
- # [START keys]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- icons = (
- pipeline
- | 'Garden plants' >> beam.Create([
- ('π', 'Strawberry'),
- ('π₯', 'Carrot'),
- ('π', 'Eggplant'),
- ('π
', 'Tomato'),
- ('π₯', 'Potato'),
- ])
- | 'Keys' >> beam.Keys()
- | beam.Map(print)
- )
- # [END keys]
- if test:
- test(icons)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/keys_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/keys_test.py
deleted file mode 100644
index 9cb4909..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/keys_test.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.examples.snippets.transforms.element_wise.keys import *
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.keys.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class KeysTest(unittest.TestCase):
- def __init__(self, methodName):
- super(KeysTest, self).__init__(methodName)
- # [START icons]
- icons = [
- 'π',
- 'π₯',
- 'π',
- 'π
',
- 'π₯',
- ]
- # [END icons]
- self.icons_test = lambda actual: assert_that(actual, equal_to(icons))
-
- def test_keys(self):
- keys(self.icons_test)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/kvswap.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/kvswap.py
deleted file mode 100644
index 2107fd5..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/kvswap.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def kvswap(test=None):
- # [START kvswap]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- plants = (
- pipeline
- | 'Garden plants' >> beam.Create([
- ('π', 'Strawberry'),
- ('π₯', 'Carrot'),
- ('π', 'Eggplant'),
- ('π
', 'Tomato'),
- ('π₯', 'Potato'),
- ])
- | 'Key-Value swap' >> beam.KvSwap()
- | beam.Map(print)
- )
- # [END kvswap]
- if test:
- test(plants)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/kvswap_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/kvswap_test.py
deleted file mode 100644
index 85fa9dc..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/kvswap_test.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.examples.snippets.transforms.element_wise.kvswap import *
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.kvswap.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class KvSwapTest(unittest.TestCase):
- def __init__(self, methodName):
- super(KvSwapTest, self).__init__(methodName)
- # [START plants]
- plants = [
- ('Strawberry', 'π'),
- ('Carrot', 'π₯'),
- ('Eggplant', 'π'),
- ('Tomato', 'π
'),
- ('Potato', 'π₯'),
- ]
- # [END plants]
- self.plants_test = lambda actual: assert_that(actual, equal_to(plants))
-
- def test_kvswap(self):
- kvswap(self.plants_test)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/map.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/map.py
deleted file mode 100644
index 9defd47..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/map.py
+++ /dev/null
@@ -1,226 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def map_simple(test=None):
- # [START map_simple]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- plants = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- ' πStrawberry \n',
- ' π₯Carrot \n',
- ' πEggplant \n',
- ' π
Tomato \n',
- ' π₯Potato \n',
- ])
- | 'Strip' >> beam.Map(str.strip)
- | beam.Map(print)
- )
- # [END map_simple]
- if test:
- test(plants)
-
-
-def map_function(test=None):
- # [START map_function]
- import apache_beam as beam
-
- def strip_header_and_newline(text):
- return text.strip('# \n')
-
- with beam.Pipeline() as pipeline:
- plants = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- '# πStrawberry\n',
- '# π₯Carrot\n',
- '# πEggplant\n',
- '# π
Tomato\n',
- '# π₯Potato\n',
- ])
- | 'Strip header' >> beam.Map(strip_header_and_newline)
- | beam.Map(print)
- )
- # [END map_function]
- if test:
- test(plants)
-
-
-def map_lambda(test=None):
- # [START map_lambda]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- plants = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- '# πStrawberry\n',
- '# π₯Carrot\n',
- '# πEggplant\n',
- '# π
Tomato\n',
- '# π₯Potato\n',
- ])
- | 'Strip header' >> beam.Map(lambda text: text.strip('# \n'))
- | beam.Map(print)
- )
- # [END map_lambda]
- if test:
- test(plants)
-
-
-def map_multiple_arguments(test=None):
- # [START map_multiple_arguments]
- import apache_beam as beam
-
- def strip(text, chars=None):
- return text.strip(chars)
-
- with beam.Pipeline() as pipeline:
- plants = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- '# πStrawberry\n',
- '# π₯Carrot\n',
- '# πEggplant\n',
- '# π
Tomato\n',
- '# π₯Potato\n',
- ])
- | 'Strip header' >> beam.Map(strip, chars='# \n')
- | beam.Map(print)
- )
- # [END map_multiple_arguments]
- if test:
- test(plants)
-
-
-def map_tuple(test=None):
- # [START map_tuple]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- plants = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- ('π', 'Strawberry'),
- ('π₯', 'Carrot'),
- ('π', 'Eggplant'),
- ('π
', 'Tomato'),
- ('π₯', 'Potato'),
- ])
- | 'Format' >> beam.MapTuple(
- lambda icon, plant: '{}{}'.format(icon, plant))
- | beam.Map(print)
- )
- # [END map_tuple]
- if test:
- test(plants)
-
-
-def map_side_inputs_singleton(test=None):
- # [START map_side_inputs_singleton]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- chars = pipeline | 'Create chars' >> beam.Create(['# \n'])
-
- plants = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- '# πStrawberry\n',
- '# π₯Carrot\n',
- '# πEggplant\n',
- '# π
Tomato\n',
- '# π₯Potato\n',
- ])
- | 'Strip header' >> beam.Map(
- lambda text, chars: text.strip(chars),
- chars=beam.pvalue.AsSingleton(chars),
- )
- | beam.Map(print)
- )
- # [END map_side_inputs_singleton]
- if test:
- test(plants)
-
-
-def map_side_inputs_iter(test=None):
- # [START map_side_inputs_iter]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- chars = pipeline | 'Create chars' >> beam.Create(['#', ' ', '\n'])
-
- plants = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- '# πStrawberry\n',
- '# π₯Carrot\n',
- '# πEggplant\n',
- '# π
Tomato\n',
- '# π₯Potato\n',
- ])
- | 'Strip header' >> beam.Map(
- lambda text, chars: text.strip(''.join(chars)),
- chars=beam.pvalue.AsIter(chars),
- )
- | beam.Map(print)
- )
- # [END map_side_inputs_iter]
- if test:
- test(plants)
-
-
-def map_side_inputs_dict(test=None):
- # [START map_side_inputs_dict]
- import apache_beam as beam
-
- def replace_duration(plant, durations):
- plant['duration'] = durations[plant['duration']]
- return plant
-
- with beam.Pipeline() as pipeline:
- durations = pipeline | 'Durations' >> beam.Create([
- (0, 'annual'),
- (1, 'biennial'),
- (2, 'perennial'),
- ])
-
- plant_details = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- {'icon': 'π', 'name': 'Strawberry', 'duration': 2},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 1},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 2},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 0},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 2},
- ])
- | 'Replace duration' >> beam.Map(
- replace_duration,
- durations=beam.pvalue.AsDict(durations),
- )
- | beam.Map(print)
- )
- # [END map_side_inputs_dict]
- if test:
- test(plant_details)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/map_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/map_test.py
deleted file mode 100644
index 5fcee8a..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/map_test.py
+++ /dev/null
@@ -1,90 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-from . import map
-
-
-def check_plants(actual):
- # [START plants]
- plants = [
- 'πStrawberry',
- 'π₯Carrot',
- 'πEggplant',
- 'π
Tomato',
- 'π₯Potato',
- ]
- # [END plants]
- assert_that(actual, equal_to(plants))
-
-
-def check_plant_details(actual):
- # [START plant_details]
- plant_details = [
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ]
- # [END plant_details]
- assert_that(actual, equal_to(plant_details))
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.map.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class MapTest(unittest.TestCase):
- def test_map_simple(self):
- map.map_simple(check_plants)
-
- def test_map_function(self):
- map.map_function(check_plants)
-
- def test_map_lambda(self):
- map.map_lambda(check_plants)
-
- def test_map_multiple_arguments(self):
- map.map_multiple_arguments(check_plants)
-
- def test_map_tuple(self):
- map.map_tuple(check_plants)
-
- def test_map_side_inputs_singleton(self):
- map.map_side_inputs_singleton(check_plants)
-
- def test_map_side_inputs_iter(self):
- map.map_side_inputs_iter(check_plants)
-
- def test_map_side_inputs_dict(self):
- map.map_side_inputs_dict(check_plant_details)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo.py
deleted file mode 100644
index 971e9f0..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo.py
+++ /dev/null
@@ -1,126 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-from __future__ import unicode_literals
-
-
-def pardo_dofn(test=None):
- # [START pardo_dofn]
- import apache_beam as beam
-
- class SplitWords(beam.DoFn):
- def __init__(self, delimiter=','):
- self.delimiter = delimiter
-
- def process(self, text):
- for word in text.split(self.delimiter):
- yield word
-
- with beam.Pipeline() as pipeline:
- plants = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- 'πStrawberry,π₯Carrot,πEggplant',
- 'π
Tomato,π₯Potato',
- ])
- | 'Split words' >> beam.ParDo(SplitWords(','))
- | beam.Map(print)
- )
- # [END pardo_dofn]
- if test:
- test(plants)
-
-
-def pardo_dofn_params(test=None):
- # pylint: disable=line-too-long
- # [START pardo_dofn_params]
- import apache_beam as beam
-
- class AnalyzeElement(beam.DoFn):
- def process(self, elem, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
- yield '\n'.join([
- '# timestamp',
- 'type(timestamp) -> ' + repr(type(timestamp)),
- 'timestamp.micros -> ' + repr(timestamp.micros),
- 'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),
- 'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),
- '',
- '# window',
- 'type(window) -> ' + repr(type(window)),
- 'window.start -> {} ({})'.format(window.start, window.start.to_utc_datetime()),
- 'window.end -> {} ({})'.format(window.end, window.end.to_utc_datetime()),
- 'window.max_timestamp() -> {} ({})'.format(window.max_timestamp(), window.max_timestamp().to_utc_datetime()),
- ])
-
- with beam.Pipeline() as pipeline:
- dofn_params = (
- pipeline
- | 'Create a single test element' >> beam.Create([':)'])
- | 'Add timestamp (Spring equinox 2020)' >> beam.Map(
- lambda elem: beam.window.TimestampedValue(elem, 1584675660))
- | 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
- | 'Analyze element' >> beam.ParDo(AnalyzeElement())
- | beam.Map(print)
- )
- # [END pardo_dofn_params]
- # pylint: enable=line-too-long
- if test:
- test(dofn_params)
-
-
-def pardo_dofn_methods(test=None):
- # [START pardo_dofn_methods]
- import apache_beam as beam
-
- class DoFnMethods(beam.DoFn):
- def __init__(self):
- print('__init__')
- self.window = beam.window.GlobalWindow()
-
- def setup(self):
- print('setup')
-
- def start_bundle(self):
- print('start_bundle')
-
- def process(self, element, window=beam.DoFn.WindowParam):
- self.window = window
- yield '* process: ' + element
-
- def finish_bundle(self):
- yield beam.utils.windowed_value.WindowedValue(
- value='* finish_bundle: π±π³π',
- timestamp=0,
- windows=[self.window],
- )
-
- def teardown(self):
- print('teardown')
-
- with beam.Pipeline() as pipeline:
- results = (
- pipeline
- | 'Create inputs' >> beam.Create(['π', 'π₯', 'π', 'π
', 'π₯'])
- | 'DoFn methods' >> beam.ParDo(DoFnMethods())
- | beam.Map(print)
- )
- # [END pardo_dofn_methods]
- if test:
- return test(results)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo_test.py
deleted file mode 100644
index a8de2d0..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo_test.py
+++ /dev/null
@@ -1,120 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import io
-import platform
-import sys
-import unittest
-
-import mock
-
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-from . import pardo
-
-
-def check_plants(actual):
- # [START plants]
- plants = [
- 'πStrawberry',
- 'π₯Carrot',
- 'πEggplant',
- 'π
Tomato',
- 'π₯Potato',
- ]
- # [END plants]
- assert_that(actual, equal_to(plants))
-
-
-def check_dofn_params(actual):
- # pylint: disable=line-too-long
- dofn_params = '\n'.join('''[START dofn_params]
-# timestamp
-type(timestamp) -> <class 'apache_beam.utils.timestamp.Timestamp'>
-timestamp.micros -> 1584675660000000
-timestamp.to_rfc3339() -> '2020-03-20T03:41:00Z'
-timestamp.to_utc_datetime() -> datetime.datetime(2020, 3, 20, 3, 41)
-
-# window
-type(window) -> <class 'apache_beam.transforms.window.IntervalWindow'>
-window.start -> Timestamp(1584675660) (2020-03-20 03:41:00)
-window.end -> Timestamp(1584675690) (2020-03-20 03:41:30)
-window.max_timestamp() -> Timestamp(1584675689.999999) (2020-03-20 03:41:29.999999)
-[END dofn_params]'''.splitlines()[1:-1])
- # pylint: enable=line-too-long
- assert_that(actual, equal_to([dofn_params]))
-
-
-def check_dofn_methods(actual):
- # Return the expected stdout to check the ordering of the called methods.
- return '''[START results]
-__init__
-setup
-start_bundle
-* process: π
-* process: π₯
-* process: π
-* process: π
-* process: π₯
-* finish_bundle: π±π³π
-teardown
-[END results]'''.splitlines()[1:-1]
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.pardo.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class ParDoTest(unittest.TestCase):
- def test_pardo_dofn(self):
- pardo.pardo_dofn(check_plants)
-
- # TODO: Remove this after Python 2 deprecation.
- # https://issues.apache.org/jira/browse/BEAM-8124
- @unittest.skipIf(sys.version_info[0] < 3 and platform.system() == 'Windows',
- 'Python 2 on Windows uses `long` rather than `int`')
- def test_pardo_dofn_params(self):
- pardo.pardo_dofn_params(check_dofn_params)
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-@mock.patch('sys.stdout', new_callable=io.StringIO)
-class ParDoStdoutTest(unittest.TestCase):
- def test_pardo_dofn_methods(self, mock_stdout):
- expected = pardo.pardo_dofn_methods(check_dofn_methods)
- actual = mock_stdout.getvalue().splitlines()
-
- # For the stdout, check the ordering of the methods, not of the elements.
- actual_stdout = [line.split(':')[0] for line in actual]
- expected_stdout = [line.split(':')[0] for line in expected]
- self.assertEqual(actual_stdout, expected_stdout)
-
- # For the elements, ignore the stdout and just make sure all elements match.
- actual_elements = {line for line in actual if line.startswith('*')}
- expected_elements = {line for line in expected if line.startswith('*')}
- self.assertEqual(actual_elements, expected_elements)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition.py
deleted file mode 100644
index 6f839d4..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition.py
+++ /dev/null
@@ -1,136 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def partition_function(test=None):
- # [START partition_function]
- import apache_beam as beam
-
- durations = ['annual', 'biennial', 'perennial']
-
- def by_duration(plant, num_partitions):
- return durations.index(plant['duration'])
-
- with beam.Pipeline() as pipeline:
- annuals, biennials, perennials = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ])
- | 'Partition' >> beam.Partition(by_duration, len(durations))
- )
- _ = (
- annuals
- | 'Annuals' >> beam.Map(lambda x: print('annual: ' + str(x)))
- )
- _ = (
- biennials
- | 'Biennials' >> beam.Map(lambda x: print('biennial: ' + str(x)))
- )
- _ = (
- perennials
- | 'Perennials' >> beam.Map(lambda x: print('perennial: ' + str(x)))
- )
- # [END partition_function]
- if test:
- test(annuals, biennials, perennials)
-
-
-def partition_lambda(test=None):
- # [START partition_lambda]
- import apache_beam as beam
-
- durations = ['annual', 'biennial', 'perennial']
-
- with beam.Pipeline() as pipeline:
- annuals, biennials, perennials = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ])
- | 'Partition' >> beam.Partition(
- lambda plant, num_partitions: durations.index(plant['duration']),
- len(durations),
- )
- )
- _ = (
- annuals
- | 'Annuals' >> beam.Map(lambda x: print('annual: ' + str(x)))
- )
- _ = (
- biennials
- | 'Biennials' >> beam.Map(lambda x: print('biennial: ' + str(x)))
- )
- _ = (
- perennials
- | 'Perennials' >> beam.Map(lambda x: print('perennial: ' + str(x)))
- )
- # [END partition_lambda]
- if test:
- test(annuals, biennials, perennials)
-
-
-def partition_multiple_arguments(test=None):
- # [START partition_multiple_arguments]
- import apache_beam as beam
- import json
-
- def split_dataset(plant, num_partitions, ratio):
- assert num_partitions == len(ratio)
- bucket = sum(map(ord, json.dumps(plant))) % sum(ratio)
- total = 0
- for i, part in enumerate(ratio):
- total += part
- if bucket < total:
- return i
- return len(ratio) - 1
-
- with beam.Pipeline() as pipeline:
- train_dataset, test_dataset = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ])
- | 'Partition' >> beam.Partition(split_dataset, 2, ratio=[8, 2])
- )
- _ = (
- train_dataset
- | 'Train' >> beam.Map(lambda x: print('train: ' + str(x)))
- )
- _ = (
- test_dataset
- | 'Test' >> beam.Map(lambda x: print('test: ' + str(x)))
- )
- # [END partition_multiple_arguments]
- if test:
- test(train_dataset, test_dataset)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition_test.py
deleted file mode 100644
index 48f83da..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition_test.py
+++ /dev/null
@@ -1,84 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-from ..element_wise import partition
-
-
-def check_partitions(actual1, actual2, actual3):
- # [START partitions]
- annuals = [
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- ]
- biennials = [
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- ]
- perennials = [
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ]
- # [END partitions]
- assert_that(actual1, equal_to(annuals), label='assert annuals')
- assert_that(actual2, equal_to(biennials), label='assert biennials')
- assert_that(actual3, equal_to(perennials), label='assert perennials')
-
-
-def check_split_datasets(actual1, actual2):
- # [START train_test]
- train_dataset = [
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ]
- test_dataset = [
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- ]
- # [END train_test]
- assert_that(actual1, equal_to(train_dataset), label='assert train')
- assert_that(actual2, equal_to(test_dataset), label='assert test')
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.partition.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class PartitionTest(unittest.TestCase):
- def test_partition_function(self):
- partition.partition_function(check_partitions)
-
- def test_partition_lambda(self):
- partition.partition_lambda(check_partitions)
-
- def test_partition_multiple_arguments(self):
- partition.partition_multiple_arguments(check_split_datasets)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/regex.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/regex.py
deleted file mode 100644
index b39b534..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/regex.py
+++ /dev/null
@@ -1,236 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def regex_matches(test=None):
- # [START regex_matches]
- import apache_beam as beam
-
- # Matches a named group 'icon', and then two comma-separated groups.
- regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
- with beam.Pipeline() as pipeline:
- plants_matches = (
- pipeline
- | 'Garden plants' >> beam.Create([
- 'π, Strawberry, perennial',
- 'π₯, Carrot, biennial ignoring trailing words',
- 'π, Eggplant, perennial',
- 'π
, Tomato, annual',
- 'π₯, Potato, perennial',
- '# π, invalid, format',
- 'invalid, π, format',
- ])
- | 'Parse plants' >> beam.Regex.matches(regex)
- | beam.Map(print)
- )
- # [END regex_matches]
- if test:
- test(plants_matches)
-
-
-def regex_all_matches(test=None):
- # [START regex_all_matches]
- import apache_beam as beam
-
- # Matches a named group 'icon', and then two comma-separated groups.
- regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
- with beam.Pipeline() as pipeline:
- plants_all_matches = (
- pipeline
- | 'Garden plants' >> beam.Create([
- 'π, Strawberry, perennial',
- 'π₯, Carrot, biennial ignoring trailing words',
- 'π, Eggplant, perennial',
- 'π
, Tomato, annual',
- 'π₯, Potato, perennial',
- '# π, invalid, format',
- 'invalid, π, format',
- ])
- | 'Parse plants' >> beam.Regex.all_matches(regex)
- | beam.Map(print)
- )
- # [END regex_all_matches]
- if test:
- test(plants_all_matches)
-
-
-def regex_matches_kv(test=None):
- # [START regex_matches_kv]
- import apache_beam as beam
-
- # Matches a named group 'icon', and then two comma-separated groups.
- regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
- with beam.Pipeline() as pipeline:
- plants_matches_kv = (
- pipeline
- | 'Garden plants' >> beam.Create([
- 'π, Strawberry, perennial',
- 'π₯, Carrot, biennial ignoring trailing words',
- 'π, Eggplant, perennial',
- 'π
, Tomato, annual',
- 'π₯, Potato, perennial',
- '# π, invalid, format',
- 'invalid, π, format',
- ])
- | 'Parse plants' >> beam.Regex.matches_kv(regex, keyGroup='icon')
- | beam.Map(print)
- )
- # [END regex_matches_kv]
- if test:
- test(plants_matches_kv)
-
-
-def regex_find(test=None):
- # [START regex_find]
- import apache_beam as beam
-
- # Matches a named group 'icon', and then two comma-separated groups.
- regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
- with beam.Pipeline() as pipeline:
- plants_matches = (
- pipeline
- | 'Garden plants' >> beam.Create([
- '# π, Strawberry, perennial',
- '# π₯, Carrot, biennial ignoring trailing words',
- '# π, Eggplant, perennial - π, Banana, perennial',
- '# π
, Tomato, annual - π, Watermelon, annual',
- '# π₯, Potato, perennial',
- ])
- | 'Parse plants' >> beam.Regex.find(regex)
- | beam.Map(print)
- )
- # [END regex_find]
- if test:
- test(plants_matches)
-
-
-def regex_find_all(test=None):
- # [START regex_find_all]
- import apache_beam as beam
-
- # Matches a named group 'icon', and then two comma-separated groups.
- regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
- with beam.Pipeline() as pipeline:
- plants_find_all = (
- pipeline
- | 'Garden plants' >> beam.Create([
- '# π, Strawberry, perennial',
- '# π₯, Carrot, biennial ignoring trailing words',
- '# π, Eggplant, perennial - π, Banana, perennial',
- '# π
, Tomato, annual - π, Watermelon, annual',
- '# π₯, Potato, perennial',
- ])
- | 'Parse plants' >> beam.Regex.find_all(regex)
- | beam.Map(print)
- )
- # [END regex_find_all]
- if test:
- test(plants_find_all)
-
-
-def regex_find_kv(test=None):
- # [START regex_find_kv]
- import apache_beam as beam
-
- # Matches a named group 'icon', and then two comma-separated groups.
- regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
- with beam.Pipeline() as pipeline:
- plants_matches_kv = (
- pipeline
- | 'Garden plants' >> beam.Create([
- '# π, Strawberry, perennial',
- '# π₯, Carrot, biennial ignoring trailing words',
- '# π, Eggplant, perennial - π, Banana, perennial',
- '# π
, Tomato, annual - π, Watermelon, annual',
- '# π₯, Potato, perennial',
- ])
- | 'Parse plants' >> beam.Regex.find_kv(regex, keyGroup='icon')
- | beam.Map(print)
- )
- # [END regex_find_kv]
- if test:
- test(plants_matches_kv)
-
-
-def regex_replace_all(test=None):
- # [START regex_replace_all]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- plants_replace_all = (
- pipeline
- | 'Garden plants' >> beam.Create([
- 'π : Strawberry : perennial',
- 'π₯ : Carrot : biennial',
- 'π\t:\tEggplant\t:\tperennial',
- 'π
: Tomato : annual',
- 'π₯ : Potato : perennial',
- ])
- | 'To CSV' >> beam.Regex.replace_all(r'\s*:\s*', ',')
- | beam.Map(print)
- )
- # [END regex_replace_all]
- if test:
- test(plants_replace_all)
-
-
-def regex_replace_first(test=None):
- # [START regex_replace_first]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- plants_replace_first = (
- pipeline
- | 'Garden plants' >> beam.Create([
- 'π, Strawberry, perennial',
- 'π₯, Carrot, biennial',
- 'π,\tEggplant, perennial',
- 'π
, Tomato, annual',
- 'π₯, Potato, perennial',
- ])
- | 'As dictionary' >> beam.Regex.replace_first(r'\s*,\s*', ': ')
- | beam.Map(print)
- )
- # [END regex_replace_first]
- if test:
- test(plants_replace_first)
-
-
-def regex_split(test=None):
- # [START regex_split]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- plants_split = (
- pipeline
- | 'Garden plants' >> beam.Create([
- 'π : Strawberry : perennial',
- 'π₯ : Carrot : biennial',
- 'π\t:\tEggplant : perennial',
- 'π
: Tomato : annual',
- 'π₯ : Potato : perennial',
- ])
- | 'Parse plants' >> beam.Regex.split(r'\s*:\s*')
- | beam.Map(print)
- )
- # [END regex_split]
- if test:
- test(plants_split)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/regex_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/regex_test.py
deleted file mode 100644
index 7e2bf78..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/regex_test.py
+++ /dev/null
@@ -1,173 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-from . import regex
-
-
-def check_matches(actual):
- # [START plants_matches]
- plants_matches = [
- 'π, Strawberry, perennial',
- 'π₯, Carrot, biennial',
- 'π, Eggplant, perennial',
- 'π
, Tomato, annual',
- 'π₯, Potato, perennial',
- ]
- # [END plants_matches]
- assert_that(actual, equal_to(plants_matches))
-
-
-def check_all_matches(actual):
- # [START plants_all_matches]
- plants_all_matches = [
- ['π, Strawberry, perennial', 'π', 'Strawberry', 'perennial'],
- ['π₯, Carrot, biennial', 'π₯', 'Carrot', 'biennial'],
- ['π, Eggplant, perennial', 'π', 'Eggplant', 'perennial'],
- ['π
, Tomato, annual', 'π
', 'Tomato', 'annual'],
- ['π₯, Potato, perennial', 'π₯', 'Potato', 'perennial'],
- ]
- # [END plants_all_matches]
- assert_that(actual, equal_to(plants_all_matches))
-
-
-def check_matches_kv(actual):
- # [START plants_matches_kv]
- plants_matches_kv = [
- ('π', 'π, Strawberry, perennial'),
- ('π₯', 'π₯, Carrot, biennial'),
- ('π', 'π, Eggplant, perennial'),
- ('π
', 'π
, Tomato, annual'),
- ('π₯', 'π₯, Potato, perennial'),
- ]
- # [END plants_matches_kv]
- assert_that(actual, equal_to(plants_matches_kv))
-
-
-def check_find_all(actual):
- # [START plants_find_all]
- plants_find_all = [
- ['π, Strawberry, perennial'],
- ['π₯, Carrot, biennial'],
- ['π, Eggplant, perennial', 'π, Banana, perennial'],
- ['π
, Tomato, annual', 'π, Watermelon, annual'],
- ['π₯, Potato, perennial'],
- ]
- # [END plants_find_all]
- assert_that(actual, equal_to(plants_find_all))
-
-
-def check_find_kv(actual):
- # [START plants_find_kv]
- plants_find_all = [
- ('π', 'π, Strawberry, perennial'),
- ('π₯', 'π₯, Carrot, biennial'),
- ('π', 'π, Eggplant, perennial'),
- ('π', 'π, Banana, perennial'),
- ('π
', 'π
, Tomato, annual'),
- ('π', 'π, Watermelon, annual'),
- ('π₯', 'π₯, Potato, perennial'),
- ]
- # [END plants_find_kv]
- assert_that(actual, equal_to(plants_find_all))
-
-
-def check_replace_all(actual):
- # [START plants_replace_all]
- plants_replace_all = [
- 'π,Strawberry,perennial',
- 'π₯,Carrot,biennial',
- 'π,Eggplant,perennial',
- 'π
,Tomato,annual',
- 'π₯,Potato,perennial',
- ]
- # [END plants_replace_all]
- assert_that(actual, equal_to(plants_replace_all))
-
-
-def check_replace_first(actual):
- # [START plants_replace_first]
- plants_replace_first = [
- 'π: Strawberry, perennial',
- 'π₯: Carrot, biennial',
- 'π: Eggplant, perennial',
- 'π
: Tomato, annual',
- 'π₯: Potato, perennial',
- ]
- # [END plants_replace_first]
- assert_that(actual, equal_to(plants_replace_first))
-
-
-def check_split(actual):
- # [START plants_split]
- plants_split = [
- ['π', 'Strawberry', 'perennial'],
- ['π₯', 'Carrot', 'biennial'],
- ['π', 'Eggplant', 'perennial'],
- ['π
', 'Tomato', 'annual'],
- ['π₯', 'Potato', 'perennial'],
- ]
- # [END plants_split]
- assert_that(actual, equal_to(plants_split))
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.regex.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class RegexTest(unittest.TestCase):
- def test_matches(self):
- regex.regex_matches(check_matches)
-
- def test_all_matches(self):
- regex.regex_all_matches(check_all_matches)
-
- def test_matches_kv(self):
- regex.regex_matches_kv(check_matches_kv)
-
- def test_find(self):
- regex.regex_find(check_matches)
-
- def test_find_all(self):
- regex.regex_find_all(check_find_all)
-
- def test_find_kv(self):
- regex.regex_find_kv(check_find_kv)
-
- def test_replace_all(self):
- regex.regex_replace_all(check_replace_all)
-
- def test_replace_first(self):
- regex.regex_replace_first(check_replace_first)
-
- def test_split(self):
- regex.regex_split(check_split)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/values.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/values.py
deleted file mode 100644
index 8504ff4..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/values.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def values(test=None):
- # [START values]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- plants = (
- pipeline
- | 'Garden plants' >> beam.Create([
- ('π', 'Strawberry'),
- ('π₯', 'Carrot'),
- ('π', 'Eggplant'),
- ('π
', 'Tomato'),
- ('π₯', 'Potato'),
- ])
- | 'Values' >> beam.Values()
- | beam.Map(print)
- )
- # [END values]
- if test:
- test(plants)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/values_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/values_test.py
deleted file mode 100644
index b43d911..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/values_test.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.examples.snippets.transforms.element_wise.values import *
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.values.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class ValuesTest(unittest.TestCase):
- def __init__(self, methodName):
- super(ValuesTest, self).__init__(methodName)
- # [START plants]
- plants = [
- 'Strawberry',
- 'Carrot',
- 'Eggplant',
- 'Tomato',
- 'Potato',
- ]
- # [END plants]
- self.plants_test = lambda actual: assert_that(actual, equal_to(plants))
-
- def test_values(self):
- values(self.plants_test)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 3e42485..4548488 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -119,7 +119,10 @@
'oauth2client>=2.0.1,<4',
'protobuf>=3.5.0.post1,<4',
# [BEAM-6287] pyarrow is not supported on Windows for Python 2
- ('pyarrow>=0.11.1,<0.15.0; python_version >= "3.0" or '
+ # [BEAM-8392] pyarrow 0.14.0 and 0.15.0 triggers an exception when importing
+ # apache_beam on macOS 10.15. Update version bounds as soon as the fix is
+ # ready
+ ('pyarrow>=0.11.1,<0.14.0; python_version >= "3.0" or '
'platform_system != "Windows"'),
'pydot>=1.2.0,<2',
'python-dateutil>=2.8.0,<3',