Merge pull request #9786 from davidcavazos/remove-old-element-wise-snippets
[BEAM-7389] Remove old element-wise snippets directory
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/__init__.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/__init__.py
deleted file mode 100644
index 6569e3f..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/__init__.py
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter.py
deleted file mode 100644
index 44b11b8..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter.py
+++ /dev/null
@@ -1,182 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def filter_function(test=None):
- # [START filter_function]
- import apache_beam as beam
-
- def is_perennial(plant):
- return plant['duration'] == 'perennial'
-
- with beam.Pipeline() as pipeline:
- perennials = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ])
- | 'Filter perennials' >> beam.Filter(is_perennial)
- | beam.Map(print)
- )
- # [END filter_function]
- if test:
- test(perennials)
-
-
-def filter_lambda(test=None):
- # [START filter_lambda]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- perennials = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ])
- | 'Filter perennials' >> beam.Filter(
- lambda plant: plant['duration'] == 'perennial')
- | beam.Map(print)
- )
- # [END filter_lambda]
- if test:
- test(perennials)
-
-
-def filter_multiple_arguments(test=None):
- # [START filter_multiple_arguments]
- import apache_beam as beam
-
- def has_duration(plant, duration):
- return plant['duration'] == duration
-
- with beam.Pipeline() as pipeline:
- perennials = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ])
- | 'Filter perennials' >> beam.Filter(has_duration, 'perennial')
- | beam.Map(print)
- )
- # [END filter_multiple_arguments]
- if test:
- test(perennials)
-
-
-def filter_side_inputs_singleton(test=None):
- # [START filter_side_inputs_singleton]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- perennial = pipeline | 'Perennial' >> beam.Create(['perennial'])
-
- perennials = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ])
- | 'Filter perennials' >> beam.Filter(
- lambda plant, duration: plant['duration'] == duration,
- duration=beam.pvalue.AsSingleton(perennial),
- )
- | beam.Map(print)
- )
- # [END filter_side_inputs_singleton]
- if test:
- test(perennials)
-
-
-def filter_side_inputs_iter(test=None):
- # [START filter_side_inputs_iter]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- valid_durations = pipeline | 'Valid durations' >> beam.Create([
- 'annual',
- 'biennial',
- 'perennial',
- ])
-
- valid_plants = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'PERENNIAL'},
- ])
- | 'Filter valid plants' >> beam.Filter(
- lambda plant, valid_durations: plant['duration'] in valid_durations,
- valid_durations=beam.pvalue.AsIter(valid_durations),
- )
- | beam.Map(print)
- )
- # [END filter_side_inputs_iter]
- if test:
- test(valid_plants)
-
-
-def filter_side_inputs_dict(test=None):
- # [START filter_side_inputs_dict]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- keep_duration = pipeline | 'Duration filters' >> beam.Create([
- ('annual', False),
- ('biennial', False),
- ('perennial', True),
- ])
-
- perennials = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ])
- | 'Filter plants by duration' >> beam.Filter(
- lambda plant, keep_duration: keep_duration[plant['duration']],
- keep_duration=beam.pvalue.AsDict(keep_duration),
- )
- | beam.Map(print)
- )
- # [END filter_side_inputs_dict]
- if test:
- test(perennials)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter_test.py
deleted file mode 100644
index 02da146..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/filter_test.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.examples.snippets.transforms.element_wise.filter import *
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.filter.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class FilterTest(unittest.TestCase):
- def __init__(self, methodName):
- super(FilterTest, self).__init__(methodName)
- # [START perennials]
- perennials = [
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ]
- # [END perennials]
- self.perennials_test = lambda actual: \
- assert_that(actual, equal_to(perennials))
-
- # [START valid_plants]
- valid_plants = [
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- ]
- # [END valid_plants]
- self.valid_plants_test = lambda actual: \
- assert_that(actual, equal_to(valid_plants))
-
- def test_filter_function(self):
- filter_function(self.perennials_test)
-
- def test_filter_lambda(self):
- filter_lambda(self.perennials_test)
-
- def test_filter_multiple_arguments(self):
- filter_multiple_arguments(self.perennials_test)
-
- def test_filter_side_inputs_singleton(self):
- filter_side_inputs_singleton(self.perennials_test)
-
- def test_filter_side_inputs_iter(self):
- filter_side_inputs_iter(self.valid_plants_test)
-
- def test_filter_side_inputs_dict(self):
- filter_side_inputs_dict(self.perennials_test)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/keys.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/keys.py
deleted file mode 100644
index 01c9d6b..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/keys.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def keys(test=None):
- # [START keys]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- icons = (
- pipeline
- | 'Garden plants' >> beam.Create([
- ('π', 'Strawberry'),
- ('π₯', 'Carrot'),
- ('π', 'Eggplant'),
- ('π
', 'Tomato'),
- ('π₯', 'Potato'),
- ])
- | 'Keys' >> beam.Keys()
- | beam.Map(print)
- )
- # [END keys]
- if test:
- test(icons)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/keys_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/keys_test.py
deleted file mode 100644
index 9cb4909..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/keys_test.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.examples.snippets.transforms.element_wise.keys import *
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.keys.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class KeysTest(unittest.TestCase):
- def __init__(self, methodName):
- super(KeysTest, self).__init__(methodName)
- # [START icons]
- icons = [
- 'π',
- 'π₯',
- 'π',
- 'π
',
- 'π₯',
- ]
- # [END icons]
- self.icons_test = lambda actual: assert_that(actual, equal_to(icons))
-
- def test_keys(self):
- keys(self.icons_test)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/kvswap.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/kvswap.py
deleted file mode 100644
index 2107fd5..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/kvswap.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def kvswap(test=None):
- # [START kvswap]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- plants = (
- pipeline
- | 'Garden plants' >> beam.Create([
- ('π', 'Strawberry'),
- ('π₯', 'Carrot'),
- ('π', 'Eggplant'),
- ('π
', 'Tomato'),
- ('π₯', 'Potato'),
- ])
- | 'Key-Value swap' >> beam.KvSwap()
- | beam.Map(print)
- )
- # [END kvswap]
- if test:
- test(plants)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/kvswap_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/kvswap_test.py
deleted file mode 100644
index 85fa9dc..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/kvswap_test.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.examples.snippets.transforms.element_wise.kvswap import *
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.kvswap.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class KvSwapTest(unittest.TestCase):
- def __init__(self, methodName):
- super(KvSwapTest, self).__init__(methodName)
- # [START plants]
- plants = [
- ('Strawberry', 'π'),
- ('Carrot', 'π₯'),
- ('Eggplant', 'π'),
- ('Tomato', 'π
'),
- ('Potato', 'π₯'),
- ]
- # [END plants]
- self.plants_test = lambda actual: assert_that(actual, equal_to(plants))
-
- def test_kvswap(self):
- kvswap(self.plants_test)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/map.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/map.py
deleted file mode 100644
index 9defd47..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/map.py
+++ /dev/null
@@ -1,226 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def map_simple(test=None):
- # [START map_simple]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- plants = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- ' πStrawberry \n',
- ' π₯Carrot \n',
- ' πEggplant \n',
- ' π
Tomato \n',
- ' π₯Potato \n',
- ])
- | 'Strip' >> beam.Map(str.strip)
- | beam.Map(print)
- )
- # [END map_simple]
- if test:
- test(plants)
-
-
-def map_function(test=None):
- # [START map_function]
- import apache_beam as beam
-
- def strip_header_and_newline(text):
- return text.strip('# \n')
-
- with beam.Pipeline() as pipeline:
- plants = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- '# πStrawberry\n',
- '# π₯Carrot\n',
- '# πEggplant\n',
- '# π
Tomato\n',
- '# π₯Potato\n',
- ])
- | 'Strip header' >> beam.Map(strip_header_and_newline)
- | beam.Map(print)
- )
- # [END map_function]
- if test:
- test(plants)
-
-
-def map_lambda(test=None):
- # [START map_lambda]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- plants = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- '# πStrawberry\n',
- '# π₯Carrot\n',
- '# πEggplant\n',
- '# π
Tomato\n',
- '# π₯Potato\n',
- ])
- | 'Strip header' >> beam.Map(lambda text: text.strip('# \n'))
- | beam.Map(print)
- )
- # [END map_lambda]
- if test:
- test(plants)
-
-
-def map_multiple_arguments(test=None):
- # [START map_multiple_arguments]
- import apache_beam as beam
-
- def strip(text, chars=None):
- return text.strip(chars)
-
- with beam.Pipeline() as pipeline:
- plants = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- '# πStrawberry\n',
- '# π₯Carrot\n',
- '# πEggplant\n',
- '# π
Tomato\n',
- '# π₯Potato\n',
- ])
- | 'Strip header' >> beam.Map(strip, chars='# \n')
- | beam.Map(print)
- )
- # [END map_multiple_arguments]
- if test:
- test(plants)
-
-
-def map_tuple(test=None):
- # [START map_tuple]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- plants = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- ('π', 'Strawberry'),
- ('π₯', 'Carrot'),
- ('π', 'Eggplant'),
- ('π
', 'Tomato'),
- ('π₯', 'Potato'),
- ])
- | 'Format' >> beam.MapTuple(
- lambda icon, plant: '{}{}'.format(icon, plant))
- | beam.Map(print)
- )
- # [END map_tuple]
- if test:
- test(plants)
-
-
-def map_side_inputs_singleton(test=None):
- # [START map_side_inputs_singleton]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- chars = pipeline | 'Create chars' >> beam.Create(['# \n'])
-
- plants = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- '# πStrawberry\n',
- '# π₯Carrot\n',
- '# πEggplant\n',
- '# π
Tomato\n',
- '# π₯Potato\n',
- ])
- | 'Strip header' >> beam.Map(
- lambda text, chars: text.strip(chars),
- chars=beam.pvalue.AsSingleton(chars),
- )
- | beam.Map(print)
- )
- # [END map_side_inputs_singleton]
- if test:
- test(plants)
-
-
-def map_side_inputs_iter(test=None):
- # [START map_side_inputs_iter]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- chars = pipeline | 'Create chars' >> beam.Create(['#', ' ', '\n'])
-
- plants = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- '# πStrawberry\n',
- '# π₯Carrot\n',
- '# πEggplant\n',
- '# π
Tomato\n',
- '# π₯Potato\n',
- ])
- | 'Strip header' >> beam.Map(
- lambda text, chars: text.strip(''.join(chars)),
- chars=beam.pvalue.AsIter(chars),
- )
- | beam.Map(print)
- )
- # [END map_side_inputs_iter]
- if test:
- test(plants)
-
-
-def map_side_inputs_dict(test=None):
- # [START map_side_inputs_dict]
- import apache_beam as beam
-
- def replace_duration(plant, durations):
- plant['duration'] = durations[plant['duration']]
- return plant
-
- with beam.Pipeline() as pipeline:
- durations = pipeline | 'Durations' >> beam.Create([
- (0, 'annual'),
- (1, 'biennial'),
- (2, 'perennial'),
- ])
-
- plant_details = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- {'icon': 'π', 'name': 'Strawberry', 'duration': 2},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 1},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 2},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 0},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 2},
- ])
- | 'Replace duration' >> beam.Map(
- replace_duration,
- durations=beam.pvalue.AsDict(durations),
- )
- | beam.Map(print)
- )
- # [END map_side_inputs_dict]
- if test:
- test(plant_details)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/map_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/map_test.py
deleted file mode 100644
index 5fcee8a..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/map_test.py
+++ /dev/null
@@ -1,90 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-from . import map
-
-
-def check_plants(actual):
- # [START plants]
- plants = [
- 'πStrawberry',
- 'π₯Carrot',
- 'πEggplant',
- 'π
Tomato',
- 'π₯Potato',
- ]
- # [END plants]
- assert_that(actual, equal_to(plants))
-
-
-def check_plant_details(actual):
- # [START plant_details]
- plant_details = [
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ]
- # [END plant_details]
- assert_that(actual, equal_to(plant_details))
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.map.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class MapTest(unittest.TestCase):
- def test_map_simple(self):
- map.map_simple(check_plants)
-
- def test_map_function(self):
- map.map_function(check_plants)
-
- def test_map_lambda(self):
- map.map_lambda(check_plants)
-
- def test_map_multiple_arguments(self):
- map.map_multiple_arguments(check_plants)
-
- def test_map_tuple(self):
- map.map_tuple(check_plants)
-
- def test_map_side_inputs_singleton(self):
- map.map_side_inputs_singleton(check_plants)
-
- def test_map_side_inputs_iter(self):
- map.map_side_inputs_iter(check_plants)
-
- def test_map_side_inputs_dict(self):
- map.map_side_inputs_dict(check_plant_details)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo.py
deleted file mode 100644
index 971e9f0..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo.py
+++ /dev/null
@@ -1,126 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-from __future__ import unicode_literals
-
-
-def pardo_dofn(test=None):
- # [START pardo_dofn]
- import apache_beam as beam
-
- class SplitWords(beam.DoFn):
- def __init__(self, delimiter=','):
- self.delimiter = delimiter
-
- def process(self, text):
- for word in text.split(self.delimiter):
- yield word
-
- with beam.Pipeline() as pipeline:
- plants = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- 'πStrawberry,π₯Carrot,πEggplant',
- 'π
Tomato,π₯Potato',
- ])
- | 'Split words' >> beam.ParDo(SplitWords(','))
- | beam.Map(print)
- )
- # [END pardo_dofn]
- if test:
- test(plants)
-
-
-def pardo_dofn_params(test=None):
- # pylint: disable=line-too-long
- # [START pardo_dofn_params]
- import apache_beam as beam
-
- class AnalyzeElement(beam.DoFn):
- def process(self, elem, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
- yield '\n'.join([
- '# timestamp',
- 'type(timestamp) -> ' + repr(type(timestamp)),
- 'timestamp.micros -> ' + repr(timestamp.micros),
- 'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),
- 'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),
- '',
- '# window',
- 'type(window) -> ' + repr(type(window)),
- 'window.start -> {} ({})'.format(window.start, window.start.to_utc_datetime()),
- 'window.end -> {} ({})'.format(window.end, window.end.to_utc_datetime()),
- 'window.max_timestamp() -> {} ({})'.format(window.max_timestamp(), window.max_timestamp().to_utc_datetime()),
- ])
-
- with beam.Pipeline() as pipeline:
- dofn_params = (
- pipeline
- | 'Create a single test element' >> beam.Create([':)'])
- | 'Add timestamp (Spring equinox 2020)' >> beam.Map(
- lambda elem: beam.window.TimestampedValue(elem, 1584675660))
- | 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
- | 'Analyze element' >> beam.ParDo(AnalyzeElement())
- | beam.Map(print)
- )
- # [END pardo_dofn_params]
- # pylint: enable=line-too-long
- if test:
- test(dofn_params)
-
-
-def pardo_dofn_methods(test=None):
- # [START pardo_dofn_methods]
- import apache_beam as beam
-
- class DoFnMethods(beam.DoFn):
- def __init__(self):
- print('__init__')
- self.window = beam.window.GlobalWindow()
-
- def setup(self):
- print('setup')
-
- def start_bundle(self):
- print('start_bundle')
-
- def process(self, element, window=beam.DoFn.WindowParam):
- self.window = window
- yield '* process: ' + element
-
- def finish_bundle(self):
- yield beam.utils.windowed_value.WindowedValue(
- value='* finish_bundle: π±π³π',
- timestamp=0,
- windows=[self.window],
- )
-
- def teardown(self):
- print('teardown')
-
- with beam.Pipeline() as pipeline:
- results = (
- pipeline
- | 'Create inputs' >> beam.Create(['π', 'π₯', 'π', 'π
', 'π₯'])
- | 'DoFn methods' >> beam.ParDo(DoFnMethods())
- | beam.Map(print)
- )
- # [END pardo_dofn_methods]
- if test:
- return test(results)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo_test.py
deleted file mode 100644
index a8de2d0..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/pardo_test.py
+++ /dev/null
@@ -1,120 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import io
-import platform
-import sys
-import unittest
-
-import mock
-
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-from . import pardo
-
-
-def check_plants(actual):
- # [START plants]
- plants = [
- 'πStrawberry',
- 'π₯Carrot',
- 'πEggplant',
- 'π
Tomato',
- 'π₯Potato',
- ]
- # [END plants]
- assert_that(actual, equal_to(plants))
-
-
-def check_dofn_params(actual):
- # pylint: disable=line-too-long
- dofn_params = '\n'.join('''[START dofn_params]
-# timestamp
-type(timestamp) -> <class 'apache_beam.utils.timestamp.Timestamp'>
-timestamp.micros -> 1584675660000000
-timestamp.to_rfc3339() -> '2020-03-20T03:41:00Z'
-timestamp.to_utc_datetime() -> datetime.datetime(2020, 3, 20, 3, 41)
-
-# window
-type(window) -> <class 'apache_beam.transforms.window.IntervalWindow'>
-window.start -> Timestamp(1584675660) (2020-03-20 03:41:00)
-window.end -> Timestamp(1584675690) (2020-03-20 03:41:30)
-window.max_timestamp() -> Timestamp(1584675689.999999) (2020-03-20 03:41:29.999999)
-[END dofn_params]'''.splitlines()[1:-1])
- # pylint: enable=line-too-long
- assert_that(actual, equal_to([dofn_params]))
-
-
-def check_dofn_methods(actual):
- # Return the expected stdout to check the ordering of the called methods.
- return '''[START results]
-__init__
-setup
-start_bundle
-* process: π
-* process: π₯
-* process: π
-* process: π
-* process: π₯
-* finish_bundle: π±π³π
-teardown
-[END results]'''.splitlines()[1:-1]
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.pardo.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class ParDoTest(unittest.TestCase):
- def test_pardo_dofn(self):
- pardo.pardo_dofn(check_plants)
-
- # TODO: Remove this after Python 2 deprecation.
- # https://issues.apache.org/jira/browse/BEAM-8124
- @unittest.skipIf(sys.version_info[0] < 3 and platform.system() == 'Windows',
- 'Python 2 on Windows uses `long` rather than `int`')
- def test_pardo_dofn_params(self):
- pardo.pardo_dofn_params(check_dofn_params)
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-@mock.patch('sys.stdout', new_callable=io.StringIO)
-class ParDoStdoutTest(unittest.TestCase):
- def test_pardo_dofn_methods(self, mock_stdout):
- expected = pardo.pardo_dofn_methods(check_dofn_methods)
- actual = mock_stdout.getvalue().splitlines()
-
- # For the stdout, check the ordering of the methods, not of the elements.
- actual_stdout = [line.split(':')[0] for line in actual]
- expected_stdout = [line.split(':')[0] for line in expected]
- self.assertEqual(actual_stdout, expected_stdout)
-
- # For the elements, ignore the stdout and just make sure all elements match.
- actual_elements = {line for line in actual if line.startswith('*')}
- expected_elements = {line for line in expected if line.startswith('*')}
- self.assertEqual(actual_elements, expected_elements)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition.py
deleted file mode 100644
index 6f839d4..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition.py
+++ /dev/null
@@ -1,136 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def partition_function(test=None):
- # [START partition_function]
- import apache_beam as beam
-
- durations = ['annual', 'biennial', 'perennial']
-
- def by_duration(plant, num_partitions):
- return durations.index(plant['duration'])
-
- with beam.Pipeline() as pipeline:
- annuals, biennials, perennials = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ])
- | 'Partition' >> beam.Partition(by_duration, len(durations))
- )
- _ = (
- annuals
- | 'Annuals' >> beam.Map(lambda x: print('annual: ' + str(x)))
- )
- _ = (
- biennials
- | 'Biennials' >> beam.Map(lambda x: print('biennial: ' + str(x)))
- )
- _ = (
- perennials
- | 'Perennials' >> beam.Map(lambda x: print('perennial: ' + str(x)))
- )
- # [END partition_function]
- if test:
- test(annuals, biennials, perennials)
-
-
-def partition_lambda(test=None):
- # [START partition_lambda]
- import apache_beam as beam
-
- durations = ['annual', 'biennial', 'perennial']
-
- with beam.Pipeline() as pipeline:
- annuals, biennials, perennials = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ])
- | 'Partition' >> beam.Partition(
- lambda plant, num_partitions: durations.index(plant['duration']),
- len(durations),
- )
- )
- _ = (
- annuals
- | 'Annuals' >> beam.Map(lambda x: print('annual: ' + str(x)))
- )
- _ = (
- biennials
- | 'Biennials' >> beam.Map(lambda x: print('biennial: ' + str(x)))
- )
- _ = (
- perennials
- | 'Perennials' >> beam.Map(lambda x: print('perennial: ' + str(x)))
- )
- # [END partition_lambda]
- if test:
- test(annuals, biennials, perennials)
-
-
-def partition_multiple_arguments(test=None):
- # [START partition_multiple_arguments]
- import apache_beam as beam
- import json
-
- def split_dataset(plant, num_partitions, ratio):
- assert num_partitions == len(ratio)
- bucket = sum(map(ord, json.dumps(plant))) % sum(ratio)
- total = 0
- for i, part in enumerate(ratio):
- total += part
- if bucket < total:
- return i
- return len(ratio) - 1
-
- with beam.Pipeline() as pipeline:
- train_dataset, test_dataset = (
- pipeline
- | 'Gardening plants' >> beam.Create([
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ])
- | 'Partition' >> beam.Partition(split_dataset, 2, ratio=[8, 2])
- )
- _ = (
- train_dataset
- | 'Train' >> beam.Map(lambda x: print('train: ' + str(x)))
- )
- _ = (
- test_dataset
- | 'Test' >> beam.Map(lambda x: print('test: ' + str(x)))
- )
- # [END partition_multiple_arguments]
- if test:
- test(train_dataset, test_dataset)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition_test.py
deleted file mode 100644
index 48f83da..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition_test.py
+++ /dev/null
@@ -1,84 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-from ..element_wise import partition
-
-
-def check_partitions(actual1, actual2, actual3):
- # [START partitions]
- annuals = [
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- ]
- biennials = [
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- ]
- perennials = [
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ]
- # [END partitions]
- assert_that(actual1, equal_to(annuals), label='assert annuals')
- assert_that(actual2, equal_to(biennials), label='assert biennials')
- assert_that(actual3, equal_to(perennials), label='assert perennials')
-
-
-def check_split_datasets(actual1, actual2):
- # [START train_test]
- train_dataset = [
- {'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'},
- {'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial'},
- {'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'},
- ]
- test_dataset = [
- {'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'},
- {'icon': 'π
', 'name': 'Tomato', 'duration': 'annual'},
- ]
- # [END train_test]
- assert_that(actual1, equal_to(train_dataset), label='assert train')
- assert_that(actual2, equal_to(test_dataset), label='assert test')
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.partition.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class PartitionTest(unittest.TestCase):
- def test_partition_function(self):
- partition.partition_function(check_partitions)
-
- def test_partition_lambda(self):
- partition.partition_lambda(check_partitions)
-
- def test_partition_multiple_arguments(self):
- partition.partition_multiple_arguments(check_split_datasets)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/regex.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/regex.py
deleted file mode 100644
index b39b534..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/regex.py
+++ /dev/null
@@ -1,236 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def regex_matches(test=None):
- # [START regex_matches]
- import apache_beam as beam
-
- # Matches a named group 'icon', and then two comma-separated groups.
- regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
- with beam.Pipeline() as pipeline:
- plants_matches = (
- pipeline
- | 'Garden plants' >> beam.Create([
- 'π, Strawberry, perennial',
- 'π₯, Carrot, biennial ignoring trailing words',
- 'π, Eggplant, perennial',
- 'π
, Tomato, annual',
- 'π₯, Potato, perennial',
- '# π, invalid, format',
- 'invalid, π, format',
- ])
- | 'Parse plants' >> beam.Regex.matches(regex)
- | beam.Map(print)
- )
- # [END regex_matches]
- if test:
- test(plants_matches)
-
-
-def regex_all_matches(test=None):
- # [START regex_all_matches]
- import apache_beam as beam
-
- # Matches a named group 'icon', and then two comma-separated groups.
- regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
- with beam.Pipeline() as pipeline:
- plants_all_matches = (
- pipeline
- | 'Garden plants' >> beam.Create([
- 'π, Strawberry, perennial',
- 'π₯, Carrot, biennial ignoring trailing words',
- 'π, Eggplant, perennial',
- 'π
, Tomato, annual',
- 'π₯, Potato, perennial',
- '# π, invalid, format',
- 'invalid, π, format',
- ])
- | 'Parse plants' >> beam.Regex.all_matches(regex)
- | beam.Map(print)
- )
- # [END regex_all_matches]
- if test:
- test(plants_all_matches)
-
-
-def regex_matches_kv(test=None):
- # [START regex_matches_kv]
- import apache_beam as beam
-
- # Matches a named group 'icon', and then two comma-separated groups.
- regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
- with beam.Pipeline() as pipeline:
- plants_matches_kv = (
- pipeline
- | 'Garden plants' >> beam.Create([
- 'π, Strawberry, perennial',
- 'π₯, Carrot, biennial ignoring trailing words',
- 'π, Eggplant, perennial',
- 'π
, Tomato, annual',
- 'π₯, Potato, perennial',
- '# π, invalid, format',
- 'invalid, π, format',
- ])
- | 'Parse plants' >> beam.Regex.matches_kv(regex, keyGroup='icon')
- | beam.Map(print)
- )
- # [END regex_matches_kv]
- if test:
- test(plants_matches_kv)
-
-
-def regex_find(test=None):
- # [START regex_find]
- import apache_beam as beam
-
- # Matches a named group 'icon', and then two comma-separated groups.
- regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
- with beam.Pipeline() as pipeline:
- plants_matches = (
- pipeline
- | 'Garden plants' >> beam.Create([
- '# π, Strawberry, perennial',
- '# π₯, Carrot, biennial ignoring trailing words',
- '# π, Eggplant, perennial - π, Banana, perennial',
- '# π
, Tomato, annual - π, Watermelon, annual',
- '# π₯, Potato, perennial',
- ])
- | 'Parse plants' >> beam.Regex.find(regex)
- | beam.Map(print)
- )
- # [END regex_find]
- if test:
- test(plants_matches)
-
-
-def regex_find_all(test=None):
- # [START regex_find_all]
- import apache_beam as beam
-
- # Matches a named group 'icon', and then two comma-separated groups.
- regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
- with beam.Pipeline() as pipeline:
- plants_find_all = (
- pipeline
- | 'Garden plants' >> beam.Create([
- '# π, Strawberry, perennial',
- '# π₯, Carrot, biennial ignoring trailing words',
- '# π, Eggplant, perennial - π, Banana, perennial',
- '# π
, Tomato, annual - π, Watermelon, annual',
- '# π₯, Potato, perennial',
- ])
- | 'Parse plants' >> beam.Regex.find_all(regex)
- | beam.Map(print)
- )
- # [END regex_find_all]
- if test:
- test(plants_find_all)
-
-
-def regex_find_kv(test=None):
- # [START regex_find_kv]
- import apache_beam as beam
-
- # Matches a named group 'icon', and then two comma-separated groups.
- regex = r'(?P<icon>[^\s,]+), *(\w+), *(\w+)'
- with beam.Pipeline() as pipeline:
- plants_matches_kv = (
- pipeline
- | 'Garden plants' >> beam.Create([
- '# π, Strawberry, perennial',
- '# π₯, Carrot, biennial ignoring trailing words',
- '# π, Eggplant, perennial - π, Banana, perennial',
- '# π
, Tomato, annual - π, Watermelon, annual',
- '# π₯, Potato, perennial',
- ])
- | 'Parse plants' >> beam.Regex.find_kv(regex, keyGroup='icon')
- | beam.Map(print)
- )
- # [END regex_find_kv]
- if test:
- test(plants_matches_kv)
-
-
-def regex_replace_all(test=None):
- # [START regex_replace_all]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- plants_replace_all = (
- pipeline
- | 'Garden plants' >> beam.Create([
- 'π : Strawberry : perennial',
- 'π₯ : Carrot : biennial',
- 'π\t:\tEggplant\t:\tperennial',
- 'π
: Tomato : annual',
- 'π₯ : Potato : perennial',
- ])
- | 'To CSV' >> beam.Regex.replace_all(r'\s*:\s*', ',')
- | beam.Map(print)
- )
- # [END regex_replace_all]
- if test:
- test(plants_replace_all)
-
-
-def regex_replace_first(test=None):
- # [START regex_replace_first]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- plants_replace_first = (
- pipeline
- | 'Garden plants' >> beam.Create([
- 'π, Strawberry, perennial',
- 'π₯, Carrot, biennial',
- 'π,\tEggplant, perennial',
- 'π
, Tomato, annual',
- 'π₯, Potato, perennial',
- ])
- | 'As dictionary' >> beam.Regex.replace_first(r'\s*,\s*', ': ')
- | beam.Map(print)
- )
- # [END regex_replace_first]
- if test:
- test(plants_replace_first)
-
-
-def regex_split(test=None):
- # [START regex_split]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- plants_split = (
- pipeline
- | 'Garden plants' >> beam.Create([
- 'π : Strawberry : perennial',
- 'π₯ : Carrot : biennial',
- 'π\t:\tEggplant : perennial',
- 'π
: Tomato : annual',
- 'π₯ : Potato : perennial',
- ])
- | 'Parse plants' >> beam.Regex.split(r'\s*:\s*')
- | beam.Map(print)
- )
- # [END regex_split]
- if test:
- test(plants_split)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/regex_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/regex_test.py
deleted file mode 100644
index 7e2bf78..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/regex_test.py
+++ /dev/null
@@ -1,173 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-from . import regex
-
-
-def check_matches(actual):
- # [START plants_matches]
- plants_matches = [
- 'π, Strawberry, perennial',
- 'π₯, Carrot, biennial',
- 'π, Eggplant, perennial',
- 'π
, Tomato, annual',
- 'π₯, Potato, perennial',
- ]
- # [END plants_matches]
- assert_that(actual, equal_to(plants_matches))
-
-
-def check_all_matches(actual):
- # [START plants_all_matches]
- plants_all_matches = [
- ['π, Strawberry, perennial', 'π', 'Strawberry', 'perennial'],
- ['π₯, Carrot, biennial', 'π₯', 'Carrot', 'biennial'],
- ['π, Eggplant, perennial', 'π', 'Eggplant', 'perennial'],
- ['π
, Tomato, annual', 'π
', 'Tomato', 'annual'],
- ['π₯, Potato, perennial', 'π₯', 'Potato', 'perennial'],
- ]
- # [END plants_all_matches]
- assert_that(actual, equal_to(plants_all_matches))
-
-
-def check_matches_kv(actual):
- # [START plants_matches_kv]
- plants_matches_kv = [
- ('π', 'π, Strawberry, perennial'),
- ('π₯', 'π₯, Carrot, biennial'),
- ('π', 'π, Eggplant, perennial'),
- ('π
', 'π
, Tomato, annual'),
- ('π₯', 'π₯, Potato, perennial'),
- ]
- # [END plants_matches_kv]
- assert_that(actual, equal_to(plants_matches_kv))
-
-
-def check_find_all(actual):
- # [START plants_find_all]
- plants_find_all = [
- ['π, Strawberry, perennial'],
- ['π₯, Carrot, biennial'],
- ['π, Eggplant, perennial', 'π, Banana, perennial'],
- ['π
, Tomato, annual', 'π, Watermelon, annual'],
- ['π₯, Potato, perennial'],
- ]
- # [END plants_find_all]
- assert_that(actual, equal_to(plants_find_all))
-
-
-def check_find_kv(actual):
- # [START plants_find_kv]
- plants_find_all = [
- ('π', 'π, Strawberry, perennial'),
- ('π₯', 'π₯, Carrot, biennial'),
- ('π', 'π, Eggplant, perennial'),
- ('π', 'π, Banana, perennial'),
- ('π
', 'π
, Tomato, annual'),
- ('π', 'π, Watermelon, annual'),
- ('π₯', 'π₯, Potato, perennial'),
- ]
- # [END plants_find_kv]
- assert_that(actual, equal_to(plants_find_all))
-
-
-def check_replace_all(actual):
- # [START plants_replace_all]
- plants_replace_all = [
- 'π,Strawberry,perennial',
- 'π₯,Carrot,biennial',
- 'π,Eggplant,perennial',
- 'π
,Tomato,annual',
- 'π₯,Potato,perennial',
- ]
- # [END plants_replace_all]
- assert_that(actual, equal_to(plants_replace_all))
-
-
-def check_replace_first(actual):
- # [START plants_replace_first]
- plants_replace_first = [
- 'π: Strawberry, perennial',
- 'π₯: Carrot, biennial',
- 'π: Eggplant, perennial',
- 'π
: Tomato, annual',
- 'π₯: Potato, perennial',
- ]
- # [END plants_replace_first]
- assert_that(actual, equal_to(plants_replace_first))
-
-
-def check_split(actual):
- # [START plants_split]
- plants_split = [
- ['π', 'Strawberry', 'perennial'],
- ['π₯', 'Carrot', 'biennial'],
- ['π', 'Eggplant', 'perennial'],
- ['π
', 'Tomato', 'annual'],
- ['π₯', 'Potato', 'perennial'],
- ]
- # [END plants_split]
- assert_that(actual, equal_to(plants_split))
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.regex.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class RegexTest(unittest.TestCase):
- def test_matches(self):
- regex.regex_matches(check_matches)
-
- def test_all_matches(self):
- regex.regex_all_matches(check_all_matches)
-
- def test_matches_kv(self):
- regex.regex_matches_kv(check_matches_kv)
-
- def test_find(self):
- regex.regex_find(check_matches)
-
- def test_find_all(self):
- regex.regex_find_all(check_find_all)
-
- def test_find_kv(self):
- regex.regex_find_kv(check_find_kv)
-
- def test_replace_all(self):
- regex.regex_replace_all(check_replace_all)
-
- def test_replace_first(self):
- regex.regex_replace_first(check_replace_first)
-
- def test_split(self):
- regex.regex_split(check_split)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/values.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/values.py
deleted file mode 100644
index 8504ff4..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/values.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-
-def values(test=None):
- # [START values]
- import apache_beam as beam
-
- with beam.Pipeline() as pipeline:
- plants = (
- pipeline
- | 'Garden plants' >> beam.Create([
- ('π', 'Strawberry'),
- ('π₯', 'Carrot'),
- ('π', 'Eggplant'),
- ('π
', 'Tomato'),
- ('π₯', 'Potato'),
- ])
- | 'Values' >> beam.Values()
- | beam.Map(print)
- )
- # [END values]
- if test:
- test(plants)
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/values_test.py b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/values_test.py
deleted file mode 100644
index b43d911..0000000
--- a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/values_test.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# coding=utf-8
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import unittest
-
-import mock
-
-from apache_beam.examples.snippets.transforms.element_wise.values import *
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-
-
-@mock.patch('apache_beam.Pipeline', TestPipeline)
-# pylint: disable=line-too-long
-@mock.patch('apache_beam.examples.snippets.transforms.element_wise.values.print', lambda elem: elem)
-# pylint: enable=line-too-long
-class ValuesTest(unittest.TestCase):
- def __init__(self, methodName):
- super(ValuesTest, self).__init__(methodName)
- # [START plants]
- plants = [
- 'Strawberry',
- 'Carrot',
- 'Eggplant',
- 'Tomato',
- 'Potato',
- ]
- # [END plants]
- self.plants_test = lambda actual: assert_that(actual, equal_to(plants))
-
- def test_values(self):
- values(self.plants_test)
-
-
-if __name__ == '__main__':
- unittest.main()