| { |
| "cells": [ |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "view-in-github" |
| }, |
| "source": [ |
| "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master//Users/dcavazos/src/beam/examples/notebooks/documentation/transforms/python/elementwise/pardo-py.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open in Colab\"/></a>" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "view-the-docs-top" |
| }, |
| "source": [ |
| "<table align=\"left\"><td><a target=\"_blank\" href=\"https://beam.apache.org/documentation/transforms/python/elementwise/pardo\"><img src=\"https://beam.apache.org/images/logos/full-color/name-bottom/beam-logo-full-color-name-bottom-100.png\" width=\"32\" height=\"32\" />View the docs</a></td></table>" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "cellView": "form", |
| "id": "_-code" |
| }, |
| "outputs": [], |
| "source": [ |
| "#@title Licensed under the Apache License, Version 2.0 (the \"License\")\n", |
| "# Licensed to the Apache Software Foundation (ASF) under one\n", |
| "# or more contributor license agreements. See the NOTICE file\n", |
| "# distributed with this work for additional information\n", |
| "# regarding copyright ownership. The ASF licenses this file\n", |
| "# to you under the Apache License, Version 2.0 (the\n", |
| "# \"License\"); you may not use this file except in compliance\n", |
| "# with the License. You may obtain a copy of the License at\n", |
| "#\n", |
| "# http://www.apache.org/licenses/LICENSE-2.0\n", |
| "#\n", |
| "# Unless required by applicable law or agreed to in writing,\n", |
| "# software distributed under the License is distributed on an\n", |
| "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", |
| "# KIND, either express or implied. See the License for the\n", |
| "# specific language governing permissions and limitations\n", |
| "# under the License." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "pardo" |
| }, |
| "source": [ |
| "# ParDo\n", |
| "\n", |
| "<script type=\"text/javascript\">\n", |
| "localStorage.setItem('language', 'language-py')\n", |
| "</script>\n", |
| "\n", |
| "<table align=\"left\" style=\"margin-right:1em\">\n", |
| " <td>\n", |
| " <a class=\"button\" target=\"_blank\" href=\"https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.ParDo\"><img src=\"https://beam.apache.org/images/logos/sdks/python.png\" width=\"32px\" height=\"32px\" alt=\"Pydoc\"/> Pydoc</a>\n", |
| " </td>\n", |
| "</table>\n", |
| "\n", |
| "<br/><br/><br/>\n", |
| "\n", |
| "A transform for generic parallel processing.\n", |
| "A `ParDo` transform considers each element in the input `PCollection`,\n", |
| "performs some processing function (your user code) on that element,\n", |
| "and emits zero or more elements to an output `PCollection`.\n", |
| "\n", |
| "See more information in the\n", |
| "[Beam Programming Guide](https://beam.apache.org/documentation/programming-guide/#pardo)." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "setup" |
| }, |
| "source": [ |
| "## Setup\n", |
| "\n", |
| "To run a code cell, you can click the **Run cell** button at the top left of the cell,\n", |
| "or select it and press **`Shift+Enter`**.\n", |
| "Try modifying a code cell and re-running it to see what happens.\n", |
| "\n", |
| "> To learn more about Colab, see\n", |
| "> [Welcome to Colaboratory!](https://colab.sandbox.google.com/notebooks/welcome.ipynb).\n", |
| "\n", |
| "First, let's install the `apache-beam` module." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "setup-code" |
| }, |
| "outputs": [], |
| "source": [ |
| "!pip install --quiet -U apache-beam" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "examples" |
| }, |
| "source": [ |
| "## Examples\n", |
| "\n", |
| "In the following examples, we explore how to create custom `DoFn`s and access\n", |
| "the timestamp and windowing information." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "example-1-pardo-with-a-simple-dofn" |
| }, |
| "source": [ |
| "### Example 1: ParDo with a simple DoFn\n", |
| "\n", |
| "The following example defines a simple `DoFn` class called `SplitWords`\n", |
| "which stores the `delimiter` as an object field.\n", |
| "The `process` method is called once per element,\n", |
| "and it can yield zero or more output elements." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "example-1-pardo-with-a-simple-dofn-code" |
| }, |
| "outputs": [], |
| "source": [ |
| "import apache_beam as beam\n", |
| "\n", |
| "class SplitWords(beam.DoFn):\n", |
| " def __init__(self, delimiter=','):\n", |
| " self.delimiter = delimiter\n", |
| "\n", |
| " def process(self, text):\n", |
| " for word in text.split(self.delimiter):\n", |
| " yield word\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " plants = (\n", |
| " pipeline\n", |
| " | 'Gardening plants' >> beam.Create([\n", |
| " '🍓Strawberry,🥕Carrot,🍆Eggplant',\n", |
| " '🍅Tomato,🥔Potato',\n", |
| " ])\n", |
| " | 'Split words' >> beam.ParDo(SplitWords(','))\n", |
| " | beam.Map(print)\n", |
| " )" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "example-1-pardo-with-a-simple-dofn-2" |
| }, |
| "source": [ |
| "<table align=\"left\" style=\"margin-right:1em\">\n", |
| " <td>\n", |
| " <a class=\"button\" target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py\"><img src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n", |
| " </td>\n", |
| "</table>\n", |
| "\n", |
| "<br/><br/><br/>" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "example-2-pardo-with-timestamp-and-window-information" |
| }, |
| "source": [ |
| "### Example 2: ParDo with timestamp and window information\n", |
| "\n", |
| "In this example, we add new parameters to the `process` method to bind parameter values at runtime.\n", |
| "\n", |
| "* [`beam.DoFn.TimestampParam`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.TimestampParam)\n", |
| " binds the timestamp information as an\n", |
| " [`apache_beam.utils.timestamp.Timestamp`](https://beam.apache.org/releases/pydoc/current/apache_beam.utils.timestamp.html#apache_beam.utils.timestamp.Timestamp)\n", |
| " object.\n", |
| "* [`beam.DoFn.WindowParam`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.WindowParam)\n", |
| " binds the window information as the appropriate\n", |
| " [`apache_beam.transforms.window.*Window`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.window.html)\n", |
| " object." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "example-2-pardo-with-timestamp-and-window-information-code" |
| }, |
| "outputs": [], |
| "source": [ |
| "import apache_beam as beam\n", |
| "\n", |
| "class AnalyzeElement(beam.DoFn):\n", |
| " def process(self, elem, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):\n", |
| " yield '\\n'.join([\n", |
| " '# timestamp',\n", |
| " 'type(timestamp) -> ' + repr(type(timestamp)),\n", |
| " 'timestamp.micros -> ' + repr(timestamp.micros),\n", |
| " 'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),\n", |
| " 'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),\n", |
| " '',\n", |
| " '# window',\n", |
| " 'type(window) -> ' + repr(type(window)),\n", |
| " 'window.start -> {} ({})'.format(window.start, window.start.to_utc_datetime()),\n", |
| " 'window.end -> {} ({})'.format(window.end, window.end.to_utc_datetime()),\n", |
| " 'window.max_timestamp() -> {} ({})'.format(window.max_timestamp(), window.max_timestamp().to_utc_datetime()),\n", |
| " ])\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " dofn_params = (\n", |
| " pipeline\n", |
| " | 'Create a single test element' >> beam.Create([':)'])\n", |
| " | 'Add timestamp (Spring equinox 2020)' >> beam.Map(\n", |
| " lambda elem: beam.window.TimestampedValue(elem, 1584675660))\n", |
| " | 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))\n", |
| " | 'Analyze element' >> beam.ParDo(AnalyzeElement())\n", |
| " | beam.Map(print)\n", |
| " )" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "example-2-pardo-with-timestamp-and-window-information-2" |
| }, |
| "source": [ |
| "<table align=\"left\" style=\"margin-right:1em\">\n", |
| " <td>\n", |
| " <a class=\"button\" target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py\"><img src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n", |
| " </td>\n", |
| "</table>\n", |
| "\n", |
| "<br/><br/><br/>" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "example-3-pardo-with-dofn-methods" |
| }, |
| "source": [ |
| "### Example 3: ParDo with DoFn methods\n", |
| "\n", |
| "A [`DoFn`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn)\n", |
| "can be customized with a number of methods that can help create more complex behaviors.\n", |
| "You can customize what a worker does when it starts and shuts down with `setup` and `teardown`.\n", |
| "You can also customize what to do when a\n", |
| "[*bundle of elements*](https://beam.apache.org/documentation/runtime/model/#bundling-and-persistence)\n", |
| "starts and finishes with `start_bundle` and `finish_bundle`.\n", |
| "\n", |
| "* [`DoFn.setup()`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.setup):\n", |
| " Called *once per `DoFn` instance* when the `DoFn` instance is initialized.\n", |
| " `setup` need not to be cached, so it could be called more than once per worker.\n", |
| " This is a good place to connect to database instances, open network connections or other resources.\n", |
| "\n", |
| "* [`DoFn.start_bundle()`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.start_bundle):\n", |
| " Called *once per bundle of elements* before calling `process` on the first element of the bundle.\n", |
| " This is a good place to start keeping track of the bundle elements.\n", |
| "\n", |
| "* [**`DoFn.process(element, *args, **kwargs)`**](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.process):\n", |
| " Called *once per element*, can *yield zero or more elements*.\n", |
| " Additional `*args` or `**kwargs` can be passed through\n", |
| " [`beam.ParDo()`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.ParDo).\n", |
| " **[required]**\n", |
| "\n", |
| "* [`DoFn.finish_bundle()`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.finish_bundle):\n", |
| " Called *once per bundle of elements* after calling `process` after the last element of the bundle,\n", |
| " can *yield zero or more elements*. This is a good place to do batch calls on a bundle of elements,\n", |
| " such as running a database query.\n", |
| "\n", |
| " For example, you can initialize a batch in `start_bundle`,\n", |
| " add elements to the batch in `process` instead of yielding them,\n", |
| " then running a batch query on those elements on `finish_bundle`, and yielding all the results.\n", |
| "\n", |
| " Note that yielded elements from `finish_bundle` must be of the type\n", |
| " [`apache_beam.utils.windowed_value.WindowedValue`](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/windowed_value.py).\n", |
| " You need to provide a timestamp as a unix timestamp, which you can get from the last processed element.\n", |
| " You also need to provide a window, which you can get from the last processed element like in the example below.\n", |
| "\n", |
| "* [`DoFn.teardown()`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.teardown):\n", |
| " Called *once (as a best effort) per `DoFn` instance* when the `DoFn` instance is shutting down.\n", |
| " This is a good place to close database instances, close network connections or other resources.\n", |
| "\n", |
| " Note that `teardown` is called as a *best effort* and is *not guaranteed*.\n", |
| " For example, if the worker crashes, `teardown` might not be called." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "example-3-pardo-with-dofn-methods-code" |
| }, |
| "outputs": [], |
| "source": [ |
| "import apache_beam as beam\n", |
| "\n", |
| "class DoFnMethods(beam.DoFn):\n", |
| " def __init__(self):\n", |
| " print('__init__')\n", |
| " self.window = beam.window.GlobalWindow()\n", |
| "\n", |
| " def setup(self):\n", |
| " print('setup')\n", |
| "\n", |
| " def start_bundle(self):\n", |
| " print('start_bundle')\n", |
| "\n", |
| " def process(self, element, window=beam.DoFn.WindowParam):\n", |
| " self.window = window\n", |
| " yield '* process: ' + element\n", |
| "\n", |
| " def finish_bundle(self):\n", |
| " yield beam.utils.windowed_value.WindowedValue(\n", |
| " value='* finish_bundle: 🌱🌳🌍',\n", |
| " timestamp=0,\n", |
| " windows=[self.window],\n", |
| " )\n", |
| "\n", |
| " def teardown(self):\n", |
| " print('teardown')\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " results = (\n", |
| " pipeline\n", |
| " | 'Create inputs' >> beam.Create(['🍓', '🥕', '🍆', '🍅', '🥔'])\n", |
| " | 'DoFn methods' >> beam.ParDo(DoFnMethods())\n", |
| " | beam.Map(print)\n", |
| " )" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "example-3-pardo-with-dofn-methods-2" |
| }, |
| "source": [ |
| "<table align=\"left\" style=\"margin-right:1em\">\n", |
| " <td>\n", |
| " <a class=\"button\" target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py\"><img src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n", |
| " </td>\n", |
| "</table>\n", |
| "\n", |
| "<br/><br/><br/>\n", |
| "\n", |
| "> *Known issues:*\n", |
| ">\n", |
| "> * [[BEAM-7885]](https://issues.apache.org/jira/browse/BEAM-7885)\n", |
| "> `DoFn.setup()` doesn't run for streaming jobs running in the `DirectRunner`.\n", |
| "> * [[BEAM-7340]](https://issues.apache.org/jira/browse/BEAM-7340)\n", |
| "> `DoFn.teardown()` metrics are lost." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "related-transforms" |
| }, |
| "source": [ |
| "## Related transforms\n", |
| "\n", |
| "* [Map](https://beam.apache.org/documentation/transforms/python/elementwise/map) behaves the same, but produces exactly one output for each input.\n", |
| "* [FlatMap](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap) behaves the same as `Map`,\n", |
| " but for each input it may produce zero or more outputs.\n", |
| "* [Filter](https://beam.apache.org/documentation/transforms/python/elementwise/filter) is useful if the function is just\n", |
| " deciding whether to output an element or not.\n", |
| "\n", |
| "<table align=\"left\" style=\"margin-right:1em\">\n", |
| " <td>\n", |
| " <a class=\"button\" target=\"_blank\" href=\"https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.ParDo\"><img src=\"https://beam.apache.org/images/logos/sdks/python.png\" width=\"32px\" height=\"32px\" alt=\"Pydoc\"/> Pydoc</a>\n", |
| " </td>\n", |
| "</table>\n", |
| "\n", |
| "<br/><br/><br/>" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "view-the-docs-bottom" |
| }, |
| "source": [ |
| "<table align=\"left\"><td><a target=\"_blank\" href=\"https://beam.apache.org/documentation/transforms/python/elementwise/pardo\"><img src=\"https://beam.apache.org/images/logos/full-color/name-bottom/beam-logo-full-color-name-bottom-100.png\" width=\"32\" height=\"32\" />View the docs</a></td></table>" |
| ] |
| } |
| ], |
| "metadata": { |
| "colab": { |
| "name": "ParDo - element-wise transform", |
| "toc_visible": true |
| }, |
| "kernelspec": { |
| "display_name": "python3", |
| "name": "python3" |
| } |
| }, |
| "nbformat": 4, |
| "nbformat_minor": 2 |
| } |