Merge pull request #9773 from RyanSkraba/BEAM-7073-avro-sql-unit-test

[BEAM-7073]: Add unit test for Avro logical type datum.
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
index 24edb28..b6e040c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
@@ -70,9 +70,6 @@
     }
 
     PortablePipelineOptions portableOptions = flinkOptions.as(PortablePipelineOptions.class);
-    if (portableOptions.getSdkWorkerParallelism() == 0L) {
-      portableOptions.setSdkWorkerParallelism(serverConfig.getSdkWorkerParallelism());
-    }
 
     PortablePipelineRunner pipelineRunner;
     if (portableOptions.getOutputExecutablePath() == null
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
index a90f7e6..1e345d0 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkJobServerDriverTest.java
@@ -43,7 +43,6 @@
     assertThat(config.getArtifactPort(), is(8098));
     assertThat(config.getExpansionPort(), is(8097));
     assertThat(config.getFlinkMasterUrl(), is("[auto]"));
-    assertThat(config.getSdkWorkerParallelism(), is(1L));
     assertThat(config.isCleanArtifactsPerJob(), is(true));
     FlinkJobServerDriver flinkJobServerDriver = FlinkJobServerDriver.fromConfig(config);
     assertThat(flinkJobServerDriver, is(not(nullValue())));
@@ -62,7 +61,6 @@
               "--expansion-port",
               "44",
               "--flink-master-url=jobmanager",
-              "--sdk-worker-parallelism=4",
               "--clean-artifacts-per-job=false",
             });
     FlinkJobServerDriver.FlinkServerConfiguration config =
@@ -72,7 +70,6 @@
     assertThat(config.getArtifactPort(), is(43));
     assertThat(config.getExpansionPort(), is(44));
     assertThat(config.getFlinkMasterUrl(), is("jobmanager"));
-    assertThat(config.getSdkWorkerParallelism(), is(4L));
     assertThat(config.isCleanArtifactsPerJob(), is(false));
   }
 
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
index f8977ff..0c5bf94 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
@@ -94,15 +94,6 @@
         handler = ExplicitBooleanOptionHandler.class)
     private boolean cleanArtifactsPerJob = true;
 
-    @Option(
-        name = "--sdk-worker-parallelism",
-        usage =
-            "Default parallelism for SDK worker processes. This option is only applied when the "
-                + "pipeline option sdkWorkerParallelism is set to 0."
-                + "Default is 1, If 0, worker parallelism will be dynamically decided by runner."
-                + "See also: sdkWorkerParallelism Pipeline Option")
-    private long sdkWorkerParallelism = 1L;
-
     public String getHost() {
       return host;
     }
@@ -126,10 +117,6 @@
     public boolean isCleanArtifactsPerJob() {
       return cleanArtifactsPerJob;
     }
-
-    public long getSdkWorkerParallelism() {
-      return this.sdkWorkerParallelism;
-    }
   }
 
   protected static ServerFactory createJobServerFactory(ServerConfiguration configuration) {
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
index b9d7bd7..e65d903 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
@@ -32,12 +32,15 @@
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
 import org.apache.beam.sdk.extensions.sql.meta.BaseBeamTable;
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.transforms.Select;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -151,12 +154,37 @@
     }
 
     @Override
+    public PCollection<Row> buildIOReader(
+        PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
+      PCollection<Row> withAllFields = buildIOReader(begin);
+      if (fieldNames.isEmpty() && filters instanceof DefaultTableFilter) {
+        return withAllFields;
+      }
+
+      PCollection<Row> result = withAllFields;
+      if (!(filters instanceof DefaultTableFilter)) {
+        throw new RuntimeException("Unimplemented at the moment.");
+      }
+
+      if (!fieldNames.isEmpty()) {
+        result = result.apply(Select.fieldNames(fieldNames.toArray(new String[0])));
+      }
+
+      return result;
+    }
+
+    @Override
     public POutput buildIOWriter(PCollection<Row> input) {
       input.apply(ParDo.of(new CollectorFn(tableWithRows)));
       return PDone.in(input.getPipeline());
     }
 
     @Override
+    public boolean supportsProjects() {
+      return true;
+    }
+
+    @Override
     public Schema getSchema() {
       return tableWithRows.table.getSchema();
     }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderTest.java
new file mode 100644
index 0000000..c65d593
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.test;
+
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class TestTableProviderTest {
+  private static final Schema BASIC_SCHEMA =
+      Schema.builder().addInt32Field("id").addStringField("name").build();
+  private BeamSqlTable beamSqlTable;
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  @Before
+  public void buildUp() {
+    TestTableProvider tableProvider = new TestTableProvider();
+    Table table = getTable("tableName");
+    tableProvider.createTable(table);
+    tableProvider.addRows(
+        table.getName(), row(BASIC_SCHEMA, 1, "one"), row(BASIC_SCHEMA, 2, "two"));
+
+    beamSqlTable = tableProvider.buildBeamSqlTable(table);
+  }
+
+  @Test
+  public void testInMemoryTableProvider_returnsSelectedColumns() {
+    PCollection<Row> result =
+        beamSqlTable.buildIOReader(
+            pipeline.begin(),
+            beamSqlTable.constructFilter(ImmutableList.of()),
+            ImmutableList.of("name"));
+
+    PAssert.that(result)
+        .containsInAnyOrder(row(result.getSchema(), "one"), row(result.getSchema(), "two"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testInMemoryTableProvider_withEmptySelectedColumns_returnsAllColumns() {
+    PCollection<Row> result =
+        beamSqlTable.buildIOReader(
+            pipeline.begin(), beamSqlTable.constructFilter(ImmutableList.of()), ImmutableList.of());
+
+    PAssert.that(result)
+        .containsInAnyOrder(row(result.getSchema(), 1, "one"), row(result.getSchema(), 2, "two"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testInMemoryTableProvider_withAllSelectedColumns_returnsAllColumns() {
+    PCollection<Row> result =
+        beamSqlTable.buildIOReader(
+            pipeline.begin(),
+            beamSqlTable.constructFilter(ImmutableList.of()),
+            ImmutableList.of("name", "id")); // Note that order is ignored
+
+    // Selected columns are outputted in the same order they are listed in the schema.
+    PAssert.that(result)
+        .containsInAnyOrder(row(result.getSchema(), 1, "one"), row(result.getSchema(), 2, "two"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testInMemoryTableProvider_withDuplicateSelectedColumns_returnsSelectedColumnsOnce() {
+    PCollection<Row> result =
+        beamSqlTable.buildIOReader(
+            pipeline.begin(),
+            beamSqlTable.constructFilter(ImmutableList.of()),
+            ImmutableList.of("name", "name"));
+
+    PAssert.that(result)
+        .containsInAnyOrder(row(result.getSchema(), "one"), row(result.getSchema(), "two"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  private static Row row(Schema schema, Object... objects) {
+    return Row.withSchema(schema).addValues(objects).build();
+  }
+
+  private static Table getTable(String name) {
+    return Table.builder()
+        .name(name)
+        .comment(name + " table")
+        .schema(BASIC_SCHEMA)
+        .type("test")
+        .build();
+  }
+}
diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java
index 79f9db0..30add9b 100644
--- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java
+++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java
@@ -36,7 +36,9 @@
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.core.type.WritableTypeId;
 import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.JsonDeserializer;
 import com.fasterxml.jackson.databind.JsonSerializer;
@@ -156,8 +158,9 @@
         SerializerProvider serializers,
         TypeSerializer typeSerializer)
         throws IOException {
-      typeSerializer.writeTypePrefixForObject(credentialsProvider, jsonGenerator);
-
+      WritableTypeId typeId =
+          typeSerializer.writeTypePrefix(
+              jsonGenerator, typeSerializer.typeId(credentialsProvider, JsonToken.START_OBJECT));
       if (credentialsProvider.getClass().equals(AWSStaticCredentialsProvider.class)) {
         jsonGenerator.writeStringField(
             AWS_ACCESS_KEY_ID, credentialsProvider.getCredentials().getAWSAccessKeyId());
@@ -197,7 +200,7 @@
         throw new IllegalArgumentException(
             "Unsupported AWS credentials provider type " + credentialsProvider.getClass());
       }
-      typeSerializer.writeTypeSuffixForObject(credentialsProvider, jsonGenerator);
+      typeSerializer.writeTypeSuffix(jsonGenerator, typeId);
     }
   }
 
diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsModule.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsModule.java
index 4adad67..79e8d15 100644
--- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsModule.java
+++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsModule.java
@@ -20,7 +20,9 @@
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.core.type.WritableTypeId;
 import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.JsonDeserializer;
 import com.fasterxml.jackson.databind.JsonSerializer;
@@ -139,7 +141,9 @@
         SerializerProvider serializer,
         TypeSerializer typeSerializer)
         throws IOException {
-      typeSerializer.writeTypePrefixForObject(credentialsProvider, jsonGenerator);
+      WritableTypeId typeId =
+          typeSerializer.writeTypePrefix(
+              jsonGenerator, typeSerializer.typeId(credentialsProvider, JsonToken.START_OBJECT));
       if (credentialsProvider.getClass().equals(StaticCredentialsProvider.class)) {
         jsonGenerator.writeStringField(
             ACCESS_KEY_ID, credentialsProvider.resolveCredentials().accessKeyId());
@@ -149,7 +153,7 @@
         throw new IllegalArgumentException(
             "Unsupported AWS credentials provider type " + credentialsProvider.getClass());
       }
-      typeSerializer.writeTypeSuffixForObject(credentialsProvider, jsonGenerator);
+      typeSerializer.writeTypeSuffix(jsonGenerator, typeId);
     }
   }
 
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/__init__.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/__init__.py
deleted file mode 100644
index 6569e3f..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/__init__.py
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter.py
deleted file mode 100644
index 44b11b8..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter.py
+++ /dev/null
@@ -1,182 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def filter_function(test=None):
-  # [START filter_function]
-  import apache_beam as beam
-
-  def is_perennial(plant):
-    return plant['duration'] == 'perennial'
-
-  with beam.Pipeline() as pipeline:
-    perennials = (
-        pipeline
-        | 'Gardening plants' >> beam.Create([
-            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
-            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
-            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
-            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
-            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
-        ])
-        | 'Filter perennials' >> beam.Filter(is_perennial)
-        | beam.Map(print)
-    )
-    # [END filter_function]
-    if test:
-      test(perennials)
-
-
-def filter_lambda(test=None):
-  # [START filter_lambda]
-  import apache_beam as beam
-
-  with beam.Pipeline() as pipeline:
-    perennials = (
-        pipeline
-        | 'Gardening plants' >> beam.Create([
-            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
-            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
-            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
-            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
-            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
-        ])
-        | 'Filter perennials' >> beam.Filter(
-            lambda plant: plant['duration'] == 'perennial')
-        | beam.Map(print)
-    )
-    # [END filter_lambda]
-    if test:
-      test(perennials)
-
-
-def filter_multiple_arguments(test=None):
-  # [START filter_multiple_arguments]
-  import apache_beam as beam
-
-  def has_duration(plant, duration):
-    return plant['duration'] == duration
-
-  with beam.Pipeline() as pipeline:
-    perennials = (
-        pipeline
-        | 'Gardening plants' >> beam.Create([
-            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
-            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
-            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
-            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
-            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
-        ])
-        | 'Filter perennials' >> beam.Filter(has_duration, 'perennial')
-        | beam.Map(print)
-    )
-    # [END filter_multiple_arguments]
-    if test:
-      test(perennials)
-
-
-def filter_side_inputs_singleton(test=None):
-  # [START filter_side_inputs_singleton]
-  import apache_beam as beam
-
-  with beam.Pipeline() as pipeline:
-    perennial = pipeline | 'Perennial' >> beam.Create(['perennial'])
-
-    perennials = (
-        pipeline
-        | 'Gardening plants' >> beam.Create([
-            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
-            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
-            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
-            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
-            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
-        ])
-        | 'Filter perennials' >> beam.Filter(
-            lambda plant, duration: plant['duration'] == duration,
-            duration=beam.pvalue.AsSingleton(perennial),
-        )
-        | beam.Map(print)
-    )
-    # [END filter_side_inputs_singleton]
-    if test:
-      test(perennials)
-
-
-def filter_side_inputs_iter(test=None):
-  # [START filter_side_inputs_iter]
-  import apache_beam as beam
-
-  with beam.Pipeline() as pipeline:
-    valid_durations = pipeline | 'Valid durations' >> beam.Create([
-        'annual',
-        'biennial',
-        'perennial',
-    ])
-
-    valid_plants = (
-        pipeline
-        | 'Gardening plants' >> beam.Create([
-            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
-            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
-            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
-            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
-            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'PERENNIAL'},
-        ])
-        | 'Filter valid plants' >> beam.Filter(
-            lambda plant, valid_durations: plant['duration'] in valid_durations,
-            valid_durations=beam.pvalue.AsIter(valid_durations),
-        )
-        | beam.Map(print)
-    )
-    # [END filter_side_inputs_iter]
-    if test:
-      test(valid_plants)
-
-
-def filter_side_inputs_dict(test=None):
-  # [START filter_side_inputs_dict]
-  import apache_beam as beam
-
-  with beam.Pipeline() as pipeline:
-    keep_duration = pipeline | 'Duration filters' >> beam.Create([
-        ('annual', False),
-        ('biennial', False),
-        ('perennial', True),
-    ])
-
-    perennials = (
-        pipeline
-        | 'Gardening plants' >> beam.Create([
-            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
-            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
-            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
-            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
-            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
-        ])
-        | 'Filter plants by duration' >> beam.Filter(
-            lambda plant, keep_duration: keep_duration[plant['duration']],
-            keep_duration=beam.pvalue.AsDict(keep_duration),
-        )
-        | beam.Map(print)
-    )
-    # [END filter_side_inputs_dict]
-    if test:
-      test(perennials)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter_test.py
deleted file mode 100644
index 02da146..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter_test.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.examples.snippets.transforms.element_wise.filter import *
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.filter.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class FilterTest(unittest.TestCase):
-  def __init__(self, methodName):
-    super(FilterTest, self).__init__(methodName)
-    # [START perennials]
-    perennials = [
-        {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
-        {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
-        {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
-    ]
-    # [END perennials]
-    self.perennials_test = lambda actual: \
-        assert_that(actual, equal_to(perennials))
-
-    # [START valid_plants]
-    valid_plants = [
-        {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
-        {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
-        {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
-        {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
-    ]
-    # [END valid_plants]
-    self.valid_plants_test = lambda actual: \
-        assert_that(actual, equal_to(valid_plants))
-
-  def test_filter_function(self):
-    filter_function(self.perennials_test)
-
-  def test_filter_lambda(self):
-    filter_lambda(self.perennials_test)
-
-  def test_filter_multiple_arguments(self):
-    filter_multiple_arguments(self.perennials_test)
-
-  def test_filter_side_inputs_singleton(self):
-    filter_side_inputs_singleton(self.perennials_test)
-
-  def test_filter_side_inputs_iter(self):
-    filter_side_inputs_iter(self.valid_plants_test)
-
-  def test_filter_side_inputs_dict(self):
-    filter_side_inputs_dict(self.perennials_test)
-
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/keys.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/keys.py
deleted file mode 100644
index 01c9d6b..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/keys.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def keys(test=None):
-  # [START keys]
-  import apache_beam as beam
-
-  with beam.Pipeline() as pipeline:
-    icons = (
-        pipeline
-        | 'Garden plants' >> beam.Create([
-            ('πŸ“', 'Strawberry'),
-            ('πŸ₯•', 'Carrot'),
-            ('πŸ†', 'Eggplant'),
-            ('πŸ…', 'Tomato'),
-            ('πŸ₯”', 'Potato'),
-        ])
-        | 'Keys' >> beam.Keys()
-        | beam.Map(print)
-    )
-    # [END keys]
-    if test:
-      test(icons)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/keys_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/keys_test.py
deleted file mode 100644
index 9cb4909..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/keys_test.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.examples.snippets.transforms.element_wise.keys import *
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.keys.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class KeysTest(unittest.TestCase):
-  def __init__(self, methodName):
-    super(KeysTest, self).__init__(methodName)
-    # [START icons]
-    icons = [
-        'πŸ“',
-        'πŸ₯•',
-        'πŸ†',
-        'πŸ…',
-        'πŸ₯”',
-    ]
-    # [END icons]
-    self.icons_test = lambda actual: assert_that(actual, equal_to(icons))
-
-  def test_keys(self):
-    keys(self.icons_test)
-
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/kvswap.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/kvswap.py
deleted file mode 100644
index 2107fd5..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/kvswap.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def kvswap(test=None):
-  # [START kvswap]
-  import apache_beam as beam
-
-  with beam.Pipeline() as pipeline:
-    plants = (
-        pipeline
-        | 'Garden plants' >> beam.Create([
-            ('πŸ“', 'Strawberry'),
-            ('πŸ₯•', 'Carrot'),
-            ('πŸ†', 'Eggplant'),
-            ('πŸ…', 'Tomato'),
-            ('πŸ₯”', 'Potato'),
-        ])
-        | 'Key-Value swap' >> beam.KvSwap()
-        | beam.Map(print)
-    )
-    # [END kvswap]
-    if test:
-      test(plants)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/kvswap_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/kvswap_test.py
deleted file mode 100644
index 85fa9dc..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/kvswap_test.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.examples.snippets.transforms.element_wise.kvswap import *
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.kvswap.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class KvSwapTest(unittest.TestCase):
-  def __init__(self, methodName):
-    super(KvSwapTest, self).__init__(methodName)
-    # [START plants]
-    plants = [
-        ('Strawberry', 'πŸ“'),
-        ('Carrot', 'πŸ₯•'),
-        ('Eggplant', 'πŸ†'),
-        ('Tomato', 'πŸ…'),
-        ('Potato', 'πŸ₯”'),
-    ]
-    # [END plants]
-    self.plants_test = lambda actual: assert_that(actual, equal_to(plants))
-
-  def test_kvswap(self):
-    kvswap(self.plants_test)
-
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/map.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/map.py
deleted file mode 100644
index 9defd47..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/map.py
+++ /dev/null
@@ -1,226 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def map_simple(test=None):
-  # [START map_simple]
-  import apache_beam as beam
-
-  with beam.Pipeline() as pipeline:
-    plants = (
-        pipeline
-        | 'Gardening plants' >> beam.Create([
-            '   πŸ“Strawberry   \n',
-            '   πŸ₯•Carrot   \n',
-            '   πŸ†Eggplant   \n',
-            '   πŸ…Tomato   \n',
-            '   πŸ₯”Potato   \n',
-        ])
-        | 'Strip' >> beam.Map(str.strip)
-        | beam.Map(print)
-    )
-    # [END map_simple]
-    if test:
-      test(plants)
-
-
-def map_function(test=None):
-  # [START map_function]
-  import apache_beam as beam
-
-  def strip_header_and_newline(text):
-    return text.strip('# \n')
-
-  with beam.Pipeline() as pipeline:
-    plants = (
-        pipeline
-        | 'Gardening plants' >> beam.Create([
-            '# πŸ“Strawberry\n',
-            '# πŸ₯•Carrot\n',
-            '# πŸ†Eggplant\n',
-            '# πŸ…Tomato\n',
-            '# πŸ₯”Potato\n',
-        ])
-        | 'Strip header' >> beam.Map(strip_header_and_newline)
-        | beam.Map(print)
-    )
-    # [END map_function]
-    if test:
-      test(plants)
-
-
-def map_lambda(test=None):
-  # [START map_lambda]
-  import apache_beam as beam
-
-  with beam.Pipeline() as pipeline:
-    plants = (
-        pipeline
-        | 'Gardening plants' >> beam.Create([
-            '# πŸ“Strawberry\n',
-            '# πŸ₯•Carrot\n',
-            '# πŸ†Eggplant\n',
-            '# πŸ…Tomato\n',
-            '# πŸ₯”Potato\n',
-        ])
-        | 'Strip header' >> beam.Map(lambda text: text.strip('# \n'))
-        | beam.Map(print)
-    )
-    # [END map_lambda]
-    if test:
-      test(plants)
-
-
-def map_multiple_arguments(test=None):
-  # [START map_multiple_arguments]
-  import apache_beam as beam
-
-  def strip(text, chars=None):
-    return text.strip(chars)
-
-  with beam.Pipeline() as pipeline:
-    plants = (
-        pipeline
-        | 'Gardening plants' >> beam.Create([
-            '# πŸ“Strawberry\n',
-            '# πŸ₯•Carrot\n',
-            '# πŸ†Eggplant\n',
-            '# πŸ…Tomato\n',
-            '# πŸ₯”Potato\n',
-        ])
-        | 'Strip header' >> beam.Map(strip, chars='# \n')
-        | beam.Map(print)
-    )
-    # [END map_multiple_arguments]
-    if test:
-      test(plants)
-
-
-def map_tuple(test=None):
-  # [START map_tuple]
-  import apache_beam as beam
-
-  with beam.Pipeline() as pipeline:
-    plants = (
-        pipeline
-        | 'Gardening plants' >> beam.Create([
-            ('πŸ“', 'Strawberry'),
-            ('πŸ₯•', 'Carrot'),
-            ('πŸ†', 'Eggplant'),
-            ('πŸ…', 'Tomato'),
-            ('πŸ₯”', 'Potato'),
-        ])
-        | 'Format' >> beam.MapTuple(
-            lambda icon, plant: '{}{}'.format(icon, plant))
-        | beam.Map(print)
-    )
-    # [END map_tuple]
-    if test:
-      test(plants)
-
-
-def map_side_inputs_singleton(test=None):
-  # [START map_side_inputs_singleton]
-  import apache_beam as beam
-
-  with beam.Pipeline() as pipeline:
-    chars = pipeline | 'Create chars' >> beam.Create(['# \n'])
-
-    plants = (
-        pipeline
-        | 'Gardening plants' >> beam.Create([
-            '# πŸ“Strawberry\n',
-            '# πŸ₯•Carrot\n',
-            '# πŸ†Eggplant\n',
-            '# πŸ…Tomato\n',
-            '# πŸ₯”Potato\n',
-        ])
-        | 'Strip header' >> beam.Map(
-            lambda text, chars: text.strip(chars),
-            chars=beam.pvalue.AsSingleton(chars),
-        )
-        | beam.Map(print)
-    )
-    # [END map_side_inputs_singleton]
-    if test:
-      test(plants)
-
-
-def map_side_inputs_iter(test=None):
-  # [START map_side_inputs_iter]
-  import apache_beam as beam
-
-  with beam.Pipeline() as pipeline:
-    chars = pipeline | 'Create chars' >> beam.Create(['#', ' ', '\n'])
-
-    plants = (
-        pipeline
-        | 'Gardening plants' >> beam.Create([
-            '# πŸ“Strawberry\n',
-            '# πŸ₯•Carrot\n',
-            '# πŸ†Eggplant\n',
-            '# πŸ…Tomato\n',
-            '# πŸ₯”Potato\n',
-        ])
-        | 'Strip header' >> beam.Map(
-            lambda text, chars: text.strip(''.join(chars)),
-            chars=beam.pvalue.AsIter(chars),
-        )
-        | beam.Map(print)
-    )
-    # [END map_side_inputs_iter]
-    if test:
-      test(plants)
-
-
-def map_side_inputs_dict(test=None):
-  # [START map_side_inputs_dict]
-  import apache_beam as beam
-
-  def replace_duration(plant, durations):
-    plant['duration'] = durations[plant['duration']]
-    return plant
-
-  with beam.Pipeline() as pipeline:
-    durations = pipeline | 'Durations' >> beam.Create([
-        (0, 'annual'),
-        (1, 'biennial'),
-        (2, 'perennial'),
-    ])
-
-    plant_details = (
-        pipeline
-        | 'Gardening plants' >> beam.Create([
-            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 2},
-            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 1},
-            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 2},
-            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 0},
-            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 2},
-        ])
-        | 'Replace duration' >> beam.Map(
-            replace_duration,
-            durations=beam.pvalue.AsDict(durations),
-        )
-        | beam.Map(print)
-    )
-    # [END map_side_inputs_dict]
-    if test:
-      test(plant_details)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/map_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/map_test.py
deleted file mode 100644
index 5fcee8a..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/map_test.py
+++ /dev/null
@@ -1,90 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-from . import map
-
-
-def check_plants(actual):
-  # [START plants]
-  plants = [
-      'πŸ“Strawberry',
-      'πŸ₯•Carrot',
-      'πŸ†Eggplant',
-      'πŸ…Tomato',
-      'πŸ₯”Potato',
-  ]
-  # [END plants]
-  assert_that(actual, equal_to(plants))
-
-
-def check_plant_details(actual):
-  # [START plant_details]
-  plant_details = [
-      {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
-      {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
-      {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
-      {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
-      {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
-  ]
-  # [END plant_details]
-  assert_that(actual, equal_to(plant_details))
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.map.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class MapTest(unittest.TestCase):
-  def test_map_simple(self):
-    map.map_simple(check_plants)
-
-  def test_map_function(self):
-    map.map_function(check_plants)
-
-  def test_map_lambda(self):
-    map.map_lambda(check_plants)
-
-  def test_map_multiple_arguments(self):
-    map.map_multiple_arguments(check_plants)
-
-  def test_map_tuple(self):
-    map.map_tuple(check_plants)
-
-  def test_map_side_inputs_singleton(self):
-    map.map_side_inputs_singleton(check_plants)
-
-  def test_map_side_inputs_iter(self):
-    map.map_side_inputs_iter(check_plants)
-
-  def test_map_side_inputs_dict(self):
-    map.map_side_inputs_dict(check_plant_details)
-
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo.py
deleted file mode 100644
index 971e9f0..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo.py
+++ /dev/null
@@ -1,126 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-from __future__ import unicode_literals
-
-
-def pardo_dofn(test=None):
-  # [START pardo_dofn]
-  import apache_beam as beam
-
-  class SplitWords(beam.DoFn):
-    def __init__(self, delimiter=','):
-      self.delimiter = delimiter
-
-    def process(self, text):
-      for word in text.split(self.delimiter):
-        yield word
-
-  with beam.Pipeline() as pipeline:
-    plants = (
-        pipeline
-        | 'Gardening plants' >> beam.Create([
-            'πŸ“Strawberry,πŸ₯•Carrot,πŸ†Eggplant',
-            'πŸ…Tomato,πŸ₯”Potato',
-        ])
-        | 'Split words' >> beam.ParDo(SplitWords(','))
-        | beam.Map(print)
-    )
-    # [END pardo_dofn]
-    if test:
-      test(plants)
-
-
-def pardo_dofn_params(test=None):
-  # pylint: disable=line-too-long
-  # [START pardo_dofn_params]
-  import apache_beam as beam
-
-  class AnalyzeElement(beam.DoFn):
-    def process(self, elem, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
-      yield '\n'.join([
-          '# timestamp',
-          'type(timestamp) -> ' + repr(type(timestamp)),
-          'timestamp.micros -> ' + repr(timestamp.micros),
-          'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),
-          'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),
-          '',
-          '# window',
-          'type(window) -> ' + repr(type(window)),
-          'window.start -> {} ({})'.format(window.start, window.start.to_utc_datetime()),
-          'window.end -> {} ({})'.format(window.end, window.end.to_utc_datetime()),
-          'window.max_timestamp() -> {} ({})'.format(window.max_timestamp(), window.max_timestamp().to_utc_datetime()),
-      ])
-
-  with beam.Pipeline() as pipeline:
-    dofn_params = (
-        pipeline
-        | 'Create a single test element' >> beam.Create([':)'])
-        | 'Add timestamp (Spring equinox 2020)' >> beam.Map(
-            lambda elem: beam.window.TimestampedValue(elem, 1584675660))
-        | 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
-        | 'Analyze element' >> beam.ParDo(AnalyzeElement())
-        | beam.Map(print)
-    )
-    # [END pardo_dofn_params]
-    # pylint: enable=line-too-long
-    if test:
-      test(dofn_params)
-
-
-def pardo_dofn_methods(test=None):
-  # [START pardo_dofn_methods]
-  import apache_beam as beam
-
-  class DoFnMethods(beam.DoFn):
-    def __init__(self):
-      print('__init__')
-      self.window = beam.window.GlobalWindow()
-
-    def setup(self):
-      print('setup')
-
-    def start_bundle(self):
-      print('start_bundle')
-
-    def process(self, element, window=beam.DoFn.WindowParam):
-      self.window = window
-      yield '* process: ' + element
-
-    def finish_bundle(self):
-      yield beam.utils.windowed_value.WindowedValue(
-          value='* finish_bundle: 🌱🌳🌍',
-          timestamp=0,
-          windows=[self.window],
-      )
-
-    def teardown(self):
-      print('teardown')
-
-  with beam.Pipeline() as pipeline:
-    results = (
-        pipeline
-        | 'Create inputs' >> beam.Create(['πŸ“', 'πŸ₯•', 'πŸ†', 'πŸ…', 'πŸ₯”'])
-        | 'DoFn methods' >> beam.ParDo(DoFnMethods())
-        | beam.Map(print)
-    )
-    # [END pardo_dofn_methods]
-    if test:
-      return test(results)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo_test.py
deleted file mode 100644
index a8de2d0..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo_test.py
+++ /dev/null
@@ -1,120 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import io
-import platform
-import sys
-import unittest
-
-import mock
-
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-from . import pardo
-
-
-def check_plants(actual):
-  # [START plants]
-  plants = [
-      'πŸ“Strawberry',
-      'πŸ₯•Carrot',
-      'πŸ†Eggplant',
-      'πŸ…Tomato',
-      'πŸ₯”Potato',
-  ]
-  # [END plants]
-  assert_that(actual, equal_to(plants))
-
-
-def check_dofn_params(actual):
-  # pylint: disable=line-too-long
-  dofn_params = '\n'.join('''[START dofn_params]
-# timestamp
-type(timestamp) -> <class 'apache_beam.utils.timestamp.Timestamp'>
-timestamp.micros -> 1584675660000000
-timestamp.to_rfc3339() -> '2020-03-20T03:41:00Z'
-timestamp.to_utc_datetime() -> datetime.datetime(2020, 3, 20, 3, 41)
-
-# window
-type(window) -> <class 'apache_beam.transforms.window.IntervalWindow'>
-window.start -> Timestamp(1584675660) (2020-03-20 03:41:00)
-window.end -> Timestamp(1584675690) (2020-03-20 03:41:30)
-window.max_timestamp() -> Timestamp(1584675689.999999) (2020-03-20 03:41:29.999999)
-[END dofn_params]'''.splitlines()[1:-1])
-  # pylint: enable=line-too-long
-  assert_that(actual, equal_to([dofn_params]))
-
-
-def check_dofn_methods(actual):
-  # Return the expected stdout to check the ordering of the called methods.
-  return '''[START results]
-__init__
-setup
-start_bundle
-* process: πŸ“
-* process: πŸ₯•
-* process: πŸ†
-* process: πŸ…
-* process: πŸ₯”
-* finish_bundle: 🌱🌳🌍
-teardown
-[END results]'''.splitlines()[1:-1]
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.pardo.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class ParDoTest(unittest.TestCase):
-  def test_pardo_dofn(self):
-    pardo.pardo_dofn(check_plants)
-
-  # TODO: Remove this after Python 2 deprecation.
-  # https://issues.apache.org/jira/browse/BEAM-8124
-  @unittest.skipIf(sys.version_info[0] < 3 and platform.system() == 'Windows',
-                   'Python 2 on Windows uses `long` rather than `int`')
-  def test_pardo_dofn_params(self):
-    pardo.pardo_dofn_params(check_dofn_params)
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-@mock.patch('sys.stdout', new_callable=io.StringIO)
-class ParDoStdoutTest(unittest.TestCase):
-  def test_pardo_dofn_methods(self, mock_stdout):
-    expected = pardo.pardo_dofn_methods(check_dofn_methods)
-    actual = mock_stdout.getvalue().splitlines()
-
-    # For the stdout, check the ordering of the methods, not of the elements.
-    actual_stdout = [line.split(':')[0] for line in actual]
-    expected_stdout = [line.split(':')[0] for line in expected]
-    self.assertEqual(actual_stdout, expected_stdout)
-
-    # For the elements, ignore the stdout and just make sure all elements match.
-    actual_elements = {line for line in actual if line.startswith('*')}
-    expected_elements = {line for line in expected if line.startswith('*')}
-    self.assertEqual(actual_elements, expected_elements)
-
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition.py
deleted file mode 100644
index 6f839d4..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition.py
+++ /dev/null
@@ -1,136 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def partition_function(test=None):
-  # [START partition_function]
-  import apache_beam as beam
-
-  durations = ['annual', 'biennial', 'perennial']
-
-  def by_duration(plant, num_partitions):
-    return durations.index(plant['duration'])
-
-  with beam.Pipeline() as pipeline:
-    annuals, biennials, perennials = (
-        pipeline
-        | 'Gardening plants' >> beam.Create([
-            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
-            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
-            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
-            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
-            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
-        ])
-        | 'Partition' >> beam.Partition(by_duration, len(durations))
-    )
-    _ = (
-        annuals
-        | 'Annuals' >> beam.Map(lambda x: print('annual: ' + str(x)))
-    )
-    _ = (
-        biennials
-        | 'Biennials' >> beam.Map(lambda x: print('biennial: ' + str(x)))
-    )
-    _ = (
-        perennials
-        | 'Perennials' >> beam.Map(lambda x: print('perennial: ' + str(x)))
-    )
-    # [END partition_function]
-    if test:
-      test(annuals, biennials, perennials)
-
-
-def partition_lambda(test=None):
-  # [START partition_lambda]
-  import apache_beam as beam
-
-  durations = ['annual', 'biennial', 'perennial']
-
-  with beam.Pipeline() as pipeline:
-    annuals, biennials, perennials = (
-        pipeline
-        | 'Gardening plants' >> beam.Create([
-            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
-            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
-            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
-            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
-            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
-        ])
-        | 'Partition' >> beam.Partition(
-            lambda plant, num_partitions: durations.index(plant['duration']),
-            len(durations),
-        )
-    )
-    _ = (
-        annuals
-        | 'Annuals' >> beam.Map(lambda x: print('annual: ' + str(x)))
-    )
-    _ = (
-        biennials
-        | 'Biennials' >> beam.Map(lambda x: print('biennial: ' + str(x)))
-    )
-    _ = (
-        perennials
-        | 'Perennials' >> beam.Map(lambda x: print('perennial: ' + str(x)))
-    )
-    # [END partition_lambda]
-    if test:
-      test(annuals, biennials, perennials)
-
-
-def partition_multiple_arguments(test=None):
-  # [START partition_multiple_arguments]
-  import apache_beam as beam
-  import json
-
-  def split_dataset(plant, num_partitions, ratio):
-    assert num_partitions == len(ratio)
-    bucket = sum(map(ord, json.dumps(plant))) % sum(ratio)
-    total = 0
-    for i, part in enumerate(ratio):
-      total += part
-      if bucket < total:
-        return i
-    return len(ratio) - 1
-
-  with beam.Pipeline() as pipeline:
-    train_dataset, test_dataset = (
-        pipeline
-        | 'Gardening plants' >> beam.Create([
-            {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
-            {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
-            {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
-            {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
-            {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
-        ])
-        | 'Partition' >> beam.Partition(split_dataset, 2, ratio=[8, 2])
-    )
-    _ = (
-        train_dataset
-        | 'Train' >> beam.Map(lambda x: print('train: ' + str(x)))
-    )
-    _ = (
-        test_dataset
-        | 'Test'  >> beam.Map(lambda x: print('test: ' + str(x)))
-    )
-    # [END partition_multiple_arguments]
-    if test:
-      test(train_dataset, test_dataset)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition_test.py
deleted file mode 100644
index 48f83da..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition_test.py
+++ /dev/null
@@ -1,84 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-from ..element_wise import partition
-
-
-def check_partitions(actual1, actual2, actual3):
-  # [START partitions]
-  annuals = [
-      {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
-  ]
-  biennials = [
-      {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
-  ]
-  perennials = [
-      {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
-      {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
-      {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
-  ]
-  # [END partitions]
-  assert_that(actual1, equal_to(annuals), label='assert annuals')
-  assert_that(actual2, equal_to(biennials), label='assert biennials')
-  assert_that(actual3, equal_to(perennials), label='assert perennials')
-
-
-def check_split_datasets(actual1, actual2):
-  # [START train_test]
-  train_dataset = [
-      {'icon': 'πŸ“', 'name': 'Strawberry', 'duration': 'perennial'},
-      {'icon': 'πŸ₯•', 'name': 'Carrot', 'duration': 'biennial'},
-      {'icon': 'πŸ₯”', 'name': 'Potato', 'duration': 'perennial'},
-  ]
-  test_dataset = [
-      {'icon': 'πŸ†', 'name': 'Eggplant', 'duration': 'perennial'},
-      {'icon': 'πŸ…', 'name': 'Tomato', 'duration': 'annual'},
-  ]
-  # [END train_test]
-  assert_that(actual1, equal_to(train_dataset), label='assert train')
-  assert_that(actual2, equal_to(test_dataset), label='assert test')
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.partition.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class PartitionTest(unittest.TestCase):
-  def test_partition_function(self):
-    partition.partition_function(check_partitions)
-
-  def test_partition_lambda(self):
-    partition.partition_lambda(check_partitions)
-
-  def test_partition_multiple_arguments(self):
-    partition.partition_multiple_arguments(check_split_datasets)
-
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/regex.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/regex.py
deleted file mode 100644
index b39b534..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/regex.py
+++ /dev/null
@@ -1,236 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def regex_matches(test=None):
-  # [START regex_matches]
-  import apache_beam as beam
-
-  # Matches a named group 'icon', and then two comma-separated groups.
-  regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
-  with beam.Pipeline() as pipeline:
-    plants_matches = (
-        pipeline
-        | 'Garden plants' >> beam.Create([
-            'πŸ“, Strawberry, perennial',
-            'πŸ₯•, Carrot, biennial ignoring trailing words',
-            'πŸ†, Eggplant, perennial',
-            'πŸ…, Tomato, annual',
-            'πŸ₯”, Potato, perennial',
-            '# 🍌, invalid, format',
-            'invalid, πŸ‰, format',
-        ])
-        | 'Parse plants' >> beam.Regex.matches(regex)
-        | beam.Map(print)
-    )
-    # [END regex_matches]
-    if test:
-      test(plants_matches)
-
-
-def regex_all_matches(test=None):
-  # [START regex_all_matches]
-  import apache_beam as beam
-
-  # Matches a named group 'icon', and then two comma-separated groups.
-  regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
-  with beam.Pipeline() as pipeline:
-    plants_all_matches = (
-        pipeline
-        | 'Garden plants' >> beam.Create([
-            'πŸ“, Strawberry, perennial',
-            'πŸ₯•, Carrot, biennial ignoring trailing words',
-            'πŸ†, Eggplant, perennial',
-            'πŸ…, Tomato, annual',
-            'πŸ₯”, Potato, perennial',
-            '# 🍌, invalid, format',
-            'invalid, πŸ‰, format',
-        ])
-        | 'Parse plants' >> beam.Regex.all_matches(regex)
-        | beam.Map(print)
-    )
-    # [END regex_all_matches]
-    if test:
-      test(plants_all_matches)
-
-
-def regex_matches_kv(test=None):
-  # [START regex_matches_kv]
-  import apache_beam as beam
-
-  # Matches a named group 'icon', and then two comma-separated groups.
-  regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
-  with beam.Pipeline() as pipeline:
-    plants_matches_kv = (
-        pipeline
-        | 'Garden plants' >> beam.Create([
-            'πŸ“, Strawberry, perennial',
-            'πŸ₯•, Carrot, biennial ignoring trailing words',
-            'πŸ†, Eggplant, perennial',
-            'πŸ…, Tomato, annual',
-            'πŸ₯”, Potato, perennial',
-            '# 🍌, invalid, format',
-            'invalid, πŸ‰, format',
-        ])
-        | 'Parse plants' >> beam.Regex.matches_kv(regex, keyGroup='icon')
-        | beam.Map(print)
-    )
-    # [END regex_matches_kv]
-    if test:
-      test(plants_matches_kv)
-
-
-def regex_find(test=None):
-  # [START regex_find]
-  import apache_beam as beam
-
-  # Matches a named group 'icon', and then two comma-separated groups.
-  regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
-  with beam.Pipeline() as pipeline:
-    plants_matches = (
-        pipeline
-        | 'Garden plants' >> beam.Create([
-            '# πŸ“, Strawberry, perennial',
-            '# πŸ₯•, Carrot, biennial ignoring trailing words',
-            '# πŸ†, Eggplant, perennial - 🍌, Banana, perennial',
-            '# πŸ…, Tomato, annual - πŸ‰, Watermelon, annual',
-            '# πŸ₯”, Potato, perennial',
-        ])
-        | 'Parse plants' >> beam.Regex.find(regex)
-        | beam.Map(print)
-    )
-    # [END regex_find]
-    if test:
-      test(plants_matches)
-
-
-def regex_find_all(test=None):
-  # [START regex_find_all]
-  import apache_beam as beam
-
-  # Matches a named group 'icon', and then two comma-separated groups.
-  regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
-  with beam.Pipeline() as pipeline:
-    plants_find_all = (
-        pipeline
-        | 'Garden plants' >> beam.Create([
-            '# πŸ“, Strawberry, perennial',
-            '# πŸ₯•, Carrot, biennial ignoring trailing words',
-            '# πŸ†, Eggplant, perennial - 🍌, Banana, perennial',
-            '# πŸ…, Tomato, annual - πŸ‰, Watermelon, annual',
-            '# πŸ₯”, Potato, perennial',
-        ])
-        | 'Parse plants' >> beam.Regex.find_all(regex)
-        | beam.Map(print)
-    )
-    # [END regex_find_all]
-    if test:
-      test(plants_find_all)
-
-
-def regex_find_kv(test=None):
-  # [START regex_find_kv]
-  import apache_beam as beam
-
-  # Matches a named group 'icon', and then two comma-separated groups.
-  regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
-  with beam.Pipeline() as pipeline:
-    plants_matches_kv = (
-        pipeline
-        | 'Garden plants' >> beam.Create([
-            '# πŸ“, Strawberry, perennial',
-            '# πŸ₯•, Carrot, biennial ignoring trailing words',
-            '# πŸ†, Eggplant, perennial - 🍌, Banana, perennial',
-            '# πŸ…, Tomato, annual - πŸ‰, Watermelon, annual',
-            '# πŸ₯”, Potato, perennial',
-        ])
-        | 'Parse plants' >> beam.Regex.find_kv(regex, keyGroup='icon')
-        | beam.Map(print)
-    )
-    # [END regex_find_kv]
-    if test:
-      test(plants_matches_kv)
-
-
-def regex_replace_all(test=None):
-  # [START regex_replace_all]
-  import apache_beam as beam
-
-  with beam.Pipeline() as pipeline:
-    plants_replace_all = (
-        pipeline
-        | 'Garden plants' >> beam.Create([
-            'πŸ“ : Strawberry : perennial',
-            'πŸ₯• : Carrot : biennial',
-            'πŸ†\t:\tEggplant\t:\tperennial',
-            'πŸ… : Tomato : annual',
-            'πŸ₯” : Potato : perennial',
-        ])
-        | 'To CSV' >> beam.Regex.replace_all(r'\s*:\s*', ',')
-        | beam.Map(print)
-    )
-    # [END regex_replace_all]
-    if test:
-      test(plants_replace_all)
-
-
-def regex_replace_first(test=None):
-  # [START regex_replace_first]
-  import apache_beam as beam
-
-  with beam.Pipeline() as pipeline:
-    plants_replace_first = (
-        pipeline
-        | 'Garden plants' >> beam.Create([
-            'πŸ“, Strawberry, perennial',
-            'πŸ₯•, Carrot, biennial',
-            'πŸ†,\tEggplant, perennial',
-            'πŸ…, Tomato, annual',
-            'πŸ₯”, Potato, perennial',
-        ])
-        | 'As dictionary' >> beam.Regex.replace_first(r'\s*,\s*', ': ')
-        | beam.Map(print)
-    )
-    # [END regex_replace_first]
-    if test:
-      test(plants_replace_first)
-
-
-def regex_split(test=None):
-  # [START regex_split]
-  import apache_beam as beam
-
-  with beam.Pipeline() as pipeline:
-    plants_split = (
-        pipeline
-        | 'Garden plants' >> beam.Create([
-            'πŸ“ : Strawberry : perennial',
-            'πŸ₯• : Carrot : biennial',
-            'πŸ†\t:\tEggplant : perennial',
-            'πŸ… : Tomato : annual',
-            'πŸ₯” : Potato : perennial',
-        ])
-        | 'Parse plants' >> beam.Regex.split(r'\s*:\s*')
-        | beam.Map(print)
-    )
-    # [END regex_split]
-    if test:
-      test(plants_split)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/regex_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/regex_test.py
deleted file mode 100644
index 7e2bf78..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/regex_test.py
+++ /dev/null
@@ -1,173 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-from . import regex
-
-
-def check_matches(actual):
-  # [START plants_matches]
-  plants_matches = [
-      'πŸ“, Strawberry, perennial',
-      'πŸ₯•, Carrot, biennial',
-      'πŸ†, Eggplant, perennial',
-      'πŸ…, Tomato, annual',
-      'πŸ₯”, Potato, perennial',
-  ]
-  # [END plants_matches]
-  assert_that(actual, equal_to(plants_matches))
-
-
-def check_all_matches(actual):
-  # [START plants_all_matches]
-  plants_all_matches = [
-      ['πŸ“, Strawberry, perennial', 'πŸ“', 'Strawberry', 'perennial'],
-      ['πŸ₯•, Carrot, biennial', 'πŸ₯•', 'Carrot', 'biennial'],
-      ['πŸ†, Eggplant, perennial', 'πŸ†', 'Eggplant', 'perennial'],
-      ['πŸ…, Tomato, annual', 'πŸ…', 'Tomato', 'annual'],
-      ['πŸ₯”, Potato, perennial', 'πŸ₯”', 'Potato', 'perennial'],
-  ]
-  # [END plants_all_matches]
-  assert_that(actual, equal_to(plants_all_matches))
-
-
-def check_matches_kv(actual):
-  # [START plants_matches_kv]
-  plants_matches_kv = [
-      ('πŸ“', 'πŸ“, Strawberry, perennial'),
-      ('πŸ₯•', 'πŸ₯•, Carrot, biennial'),
-      ('πŸ†', 'πŸ†, Eggplant, perennial'),
-      ('πŸ…', 'πŸ…, Tomato, annual'),
-      ('πŸ₯”', 'πŸ₯”, Potato, perennial'),
-  ]
-  # [END plants_matches_kv]
-  assert_that(actual, equal_to(plants_matches_kv))
-
-
-def check_find_all(actual):
-  # [START plants_find_all]
-  plants_find_all = [
-      ['πŸ“, Strawberry, perennial'],
-      ['πŸ₯•, Carrot, biennial'],
-      ['πŸ†, Eggplant, perennial', '🍌, Banana, perennial'],
-      ['πŸ…, Tomato, annual', 'πŸ‰, Watermelon, annual'],
-      ['πŸ₯”, Potato, perennial'],
-  ]
-  # [END plants_find_all]
-  assert_that(actual, equal_to(plants_find_all))
-
-
-def check_find_kv(actual):
-  # [START plants_find_kv]
-  plants_find_all = [
-      ('πŸ“', 'πŸ“, Strawberry, perennial'),
-      ('πŸ₯•', 'πŸ₯•, Carrot, biennial'),
-      ('πŸ†', 'πŸ†, Eggplant, perennial'),
-      ('🍌', '🍌, Banana, perennial'),
-      ('πŸ…', 'πŸ…, Tomato, annual'),
-      ('πŸ‰', 'πŸ‰, Watermelon, annual'),
-      ('πŸ₯”', 'πŸ₯”, Potato, perennial'),
-  ]
-  # [END plants_find_kv]
-  assert_that(actual, equal_to(plants_find_all))
-
-
-def check_replace_all(actual):
-  # [START plants_replace_all]
-  plants_replace_all = [
-      'πŸ“,Strawberry,perennial',
-      'πŸ₯•,Carrot,biennial',
-      'πŸ†,Eggplant,perennial',
-      'πŸ…,Tomato,annual',
-      'πŸ₯”,Potato,perennial',
-  ]
-  # [END plants_replace_all]
-  assert_that(actual, equal_to(plants_replace_all))
-
-
-def check_replace_first(actual):
-  # [START plants_replace_first]
-  plants_replace_first = [
-      'πŸ“: Strawberry, perennial',
-      'πŸ₯•: Carrot, biennial',
-      'πŸ†: Eggplant, perennial',
-      'πŸ…: Tomato, annual',
-      'πŸ₯”: Potato, perennial',
-  ]
-  # [END plants_replace_first]
-  assert_that(actual, equal_to(plants_replace_first))
-
-
-def check_split(actual):
-  # [START plants_split]
-  plants_split = [
-      ['πŸ“', 'Strawberry', 'perennial'],
-      ['πŸ₯•', 'Carrot', 'biennial'],
-      ['πŸ†', 'Eggplant', 'perennial'],
-      ['πŸ…', 'Tomato', 'annual'],
-      ['πŸ₯”', 'Potato', 'perennial'],
-  ]
-  # [END plants_split]
-  assert_that(actual, equal_to(plants_split))
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.regex.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class RegexTest(unittest.TestCase):
-  def test_matches(self):
-    regex.regex_matches(check_matches)
-
-  def test_all_matches(self):
-    regex.regex_all_matches(check_all_matches)
-
-  def test_matches_kv(self):
-    regex.regex_matches_kv(check_matches_kv)
-
-  def test_find(self):
-    regex.regex_find(check_matches)
-
-  def test_find_all(self):
-    regex.regex_find_all(check_find_all)
-
-  def test_find_kv(self):
-    regex.regex_find_kv(check_find_kv)
-
-  def test_replace_all(self):
-    regex.regex_replace_all(check_replace_all)
-
-  def test_replace_first(self):
-    regex.regex_replace_first(check_replace_first)
-
-  def test_split(self):
-    regex.regex_split(check_split)
-
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/values.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/values.py
deleted file mode 100644
index 8504ff4..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/values.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def values(test=None):
-  # [START values]
-  import apache_beam as beam
-
-  with beam.Pipeline() as pipeline:
-    plants = (
-        pipeline
-        | 'Garden plants' >> beam.Create([
-            ('πŸ“', 'Strawberry'),
-            ('πŸ₯•', 'Carrot'),
-            ('πŸ†', 'Eggplant'),
-            ('πŸ…', 'Tomato'),
-            ('πŸ₯”', 'Potato'),
-        ])
-        | 'Values' >> beam.Values()
-        | beam.Map(print)
-    )
-    # [END values]
-    if test:
-      test(plants)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/values_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/values_test.py
deleted file mode 100644
index b43d911..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/values_test.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.examples.snippets.transforms.element_wise.values import *
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.values.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class ValuesTest(unittest.TestCase):
-  def __init__(self, methodName):
-    super(ValuesTest, self).__init__(methodName)
-    # [START plants]
-    plants = [
-        'Strawberry',
-        'Carrot',
-        'Eggplant',
-        'Tomato',
-        'Potato',
-    ]
-    # [END plants]
-    self.plants_test = lambda actual: assert_that(actual, equal_to(plants))
-
-  def test_values(self):
-    values(self.plants_test)
-
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 3e42485..4548488 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -119,7 +119,10 @@
     'oauth2client>=2.0.1,<4',
     'protobuf>=3.5.0.post1,<4',
     # [BEAM-6287] pyarrow is not supported on Windows for Python 2
-    ('pyarrow>=0.11.1,<0.15.0; python_version >= "3.0" or '
+    # [BEAM-8392] pyarrow 0.14.0 and 0.15.0 triggers an exception when importing
+    # apache_beam on macOS 10.15. Update version bounds as soon as the fix is
+    # ready
+    ('pyarrow>=0.11.1,<0.14.0; python_version >= "3.0" or '
      'platform_system != "Windows"'),
     'pydot>=1.2.0,<2',
     'python-dateutil>=2.8.0,<3',