blob: c0125c97841a0abcbab31e2b5aafafdfb9ea1623 [file] [log] [blame]
"cells": [
"cell_type": "markdown",
"metadata": {
"id": "view-in-github"
"source": [
"<a href=\"\" target=\"_parent\"><img src=\"\" alt=\"Open in Colab\"/></a>"
"cell_type": "markdown",
"metadata": {
"id": "view-the-docs-top"
"source": [
"<table align=\"left\"><td><a target=\"_blank\" href=\"\"><img src=\"\" width=\"32\" height=\"32\" />View the docs</a></td></table>"
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "_-code"
"outputs": [],
"source": [
"#@title Licensed under the Apache License, Version 2.0 (the \"License\")\n",
"# Licensed to the Apache Software Foundation (ASF) under one\n",
"# or more contributor license agreements. See the NOTICE file\n",
"# distributed with this work for additional information\n",
"# regarding copyright ownership. The ASF licenses this file\n",
"# to you under the Apache License, Version 2.0 (the\n",
"# \"License\"); you may not use this file except in compliance\n",
"# with the License. You may obtain a copy of the License at\n",
"# Unless required by applicable law or agreed to in writing,\n",
"# software distributed under the License is distributed on an\n",
"# KIND, either express or implied. See the License for the\n",
"# specific language governing permissions and limitations\n",
"# under the License."
"cell_type": "markdown",
"metadata": {
"id": "pardo"
"source": [
"# ParDo\n",
"<script type=\"text/javascript\">\n",
"localStorage.setItem('language', 'language-py')\n",
"<table align=\"left\" style=\"margin-right:1em\">\n",
" <td>\n",
" <a class=\"button\" target=\"_blank\" href=\"\"><img src=\"\" width=\"32px\" height=\"32px\" alt=\"Pydoc\"/> Pydoc</a>\n",
" </td>\n",
"A transform for generic parallel processing.\n",
"A `ParDo` transform considers each element in the input `PCollection`,\n",
"performs some processing function (your user code) on that element,\n",
"and emits zero or more elements to an output `PCollection`.\n",
"See more information in the\n",
"[Beam Programming Guide]("
"cell_type": "markdown",
"metadata": {
"id": "setup"
"source": [
"## Setup\n",
"To run a code cell, you can click the **Run cell** button at the top left of the cell,\n",
"or select it and press **`Shift+Enter`**.\n",
"Try modifying a code cell and re-running it to see what happens.\n",
"> To learn more about Colab, see\n",
"> [Welcome to Colaboratory!](\n",
"First, let's install the `apache-beam` module."
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "setup-code"
"outputs": [],
"source": [
"!pip install --quiet -U apache-beam"
"cell_type": "markdown",
"metadata": {
"id": "examples"
"source": [
"## Examples\n",
"In the following examples, we explore how to create custom `DoFn`s and access\n",
"the timestamp and windowing information."
"cell_type": "markdown",
"metadata": {
"id": "example-1-pardo-with-a-simple-dofn"
"source": [
"### Example 1: ParDo with a simple DoFn\n",
"The following example defines a simple `DoFn` class called `SplitWords`\n",
"which stores the `delimiter` as an object field.\n",
"The `process` method is called once per element,\n",
"and it can yield zero or more output elements."
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "example-1-pardo-with-a-simple-dofn-code"
"outputs": [],
"source": [
"import apache_beam as beam\n",
"class SplitWords(beam.DoFn):\n",
" def __init__(self, delimiter=','):\n",
" self.delimiter = delimiter\n",
" def process(self, text):\n",
" for word in text.split(self.delimiter):\n",
" yield word\n",
"with beam.Pipeline() as pipeline:\n",
" plants = (\n",
" pipeline\n",
" | 'Gardening plants' >> beam.Create([\n",
" '🍓Strawberry,🥕Carrot,🍆Eggplant',\n",
" '🍅Tomato,🥔Potato',\n",
" ])\n",
" | 'Split words' >> beam.ParDo(SplitWords(','))\n",
" | beam.Map(print))"
"cell_type": "markdown",
"metadata": {
"id": "example-1-pardo-with-a-simple-dofn-2"
"source": [
"<table align=\"left\" style=\"margin-right:1em\">\n",
" <td>\n",
" <a class=\"button\" target=\"_blank\" href=\"\"><img src=\"\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n",
" </td>\n",
"cell_type": "markdown",
"metadata": {
"id": "example-2-pardo-with-timestamp-and-window-information"
"source": [
"### Example 2: ParDo with timestamp and window information\n",
"In this example, we add new parameters to the `process` method to bind parameter values at runtime.\n",
"* [`beam.DoFn.TimestampParam`](\n",
" binds the timestamp information as an\n",
" [`apache_beam.utils.timestamp.Timestamp`](\n",
" object.\n",
"* [`beam.DoFn.WindowParam`](\n",
" binds the window information as the appropriate\n",
" [`apache_beam.transforms.window.*Window`](\n",
" object."
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "example-2-pardo-with-timestamp-and-window-information-code"
"outputs": [],
"source": [
"import apache_beam as beam\n",
"class AnalyzeElement(beam.DoFn):\n",
" def process(\n",
" self,\n",
" elem,\n",
" timestamp=beam.DoFn.TimestampParam,\n",
" window=beam.DoFn.WindowParam):\n",
" yield '\\n'.join([\n",
" '# timestamp',\n",
" 'type(timestamp) -> ' + repr(type(timestamp)),\n",
" 'timestamp.micros -> ' + repr(timestamp.micros),\n",
" 'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),\n",
" 'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),\n",
" '',\n",
" '# window',\n",
" 'type(window) -> ' + repr(type(window)),\n",
" 'window.start -> {} ({})'.format(\n",
" window.start, window.start.to_utc_datetime()),\n",
" 'window.end -> {} ({})'.format(\n",
" window.end, window.end.to_utc_datetime()),\n",
" 'window.max_timestamp() -> {} ({})'.format(\n",
" window.max_timestamp(), window.max_timestamp().to_utc_datetime()),\n",
" ])\n",
"with beam.Pipeline() as pipeline:\n",
" dofn_params = (\n",
" pipeline\n",
" | 'Create a single test element' >> beam.Create([':)'])\n",
" | 'Add timestamp (Spring equinox 2020)' >>\n",
" beam.Map(lambda elem: beam.window.TimestampedValue(elem, 1584675660))\n",
" |\n",
" 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))\n",
" | 'Analyze element' >> beam.ParDo(AnalyzeElement())\n",
" | beam.Map(print))"
"cell_type": "markdown",
"metadata": {
"id": "example-2-pardo-with-timestamp-and-window-information-2"
"source": [
"<table align=\"left\" style=\"margin-right:1em\">\n",
" <td>\n",
" <a class=\"button\" target=\"_blank\" href=\"\"><img src=\"\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n",
" </td>\n",
"cell_type": "markdown",
"metadata": {
"id": "example-3-pardo-with-dofn-methods"
"source": [
"### Example 3: ParDo with DoFn methods\n",
"A [`DoFn`](\n",
"can be customized with a number of methods that can help create more complex behaviors.\n",
"You can customize what a worker does when it starts and shuts down with `setup` and `teardown`.\n",
"You can also customize what to do when a\n",
"[*bundle of elements*](\n",
"starts and finishes with `start_bundle` and `finish_bundle`.\n",
"* [`DoFn.setup()`](\n",
" Called *once per `DoFn` instance* when the `DoFn` instance is initialized.\n",
" `setup` need not to be cached, so it could be called more than once per worker.\n",
" This is a good place to connect to database instances, open network connections or other resources.\n",
"* [`DoFn.start_bundle()`](\n",
" Called *once per bundle of elements* before calling `process` on the first element of the bundle.\n",
" This is a good place to start keeping track of the bundle elements.\n",
"* [**`DoFn.process(element, *args, **kwargs)`**](\n",
" Called *once per element*, can *yield zero or more elements*.\n",
" Additional `*args` or `**kwargs` can be passed through\n",
" [`beam.ParDo()`](\n",
" **[required]**\n",
"* [`DoFn.finish_bundle()`](\n",
" Called *once per bundle of elements* after calling `process` after the last element of the bundle,\n",
" can *yield zero or more elements*. This is a good place to do batch calls on a bundle of elements,\n",
" such as running a database query.\n",
" For example, you can initialize a batch in `start_bundle`,\n",
" add elements to the batch in `process` instead of yielding them,\n",
" then running a batch query on those elements on `finish_bundle`, and yielding all the results.\n",
" Note that yielded elements from `finish_bundle` must be of the type\n",
" [`apache_beam.utils.windowed_value.WindowedValue`](\n",
" You need to provide a timestamp as a unix timestamp, which you can get from the last processed element.\n",
" You also need to provide a window, which you can get from the last processed element like in the example below.\n",
"* [`DoFn.teardown()`](\n",
" Called *once (as a best effort) per `DoFn` instance* when the `DoFn` instance is shutting down.\n",
" This is a good place to close database instances, close network connections or other resources.\n",
" Note that `teardown` is called as a *best effort* and is *not guaranteed*.\n",
" For example, if the worker crashes, `teardown` might not be called."
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "example-3-pardo-with-dofn-methods-code"
"outputs": [],
"source": [
"import apache_beam as beam\n",
"class DoFnMethods(beam.DoFn):\n",
" def __init__(self):\n",
" print('__init__')\n",
" self.window = beam.window.GlobalWindow()\n",
" def setup(self):\n",
" print('setup')\n",
" def start_bundle(self):\n",
" print('start_bundle')\n",
" def process(self, element, window=beam.DoFn.WindowParam):\n",
" self.window = window\n",
" yield '* process: ' + element\n",
" def finish_bundle(self):\n",
" yield beam.utils.windowed_value.WindowedValue(\n",
" value='* finish_bundle: 🌱🌳🌍',\n",
" timestamp=0,\n",
" windows=[self.window],\n",
" )\n",
" def teardown(self):\n",
" print('teardown')\n",
"with beam.Pipeline() as pipeline:\n",
" results = (\n",
" pipeline\n",
" | 'Create inputs' >> beam.Create(['🍓', '🥕', '🍆', '🍅', '🥔'])\n",
" | 'DoFn methods' >> beam.ParDo(DoFnMethods())\n",
" | beam.Map(print))"
"cell_type": "markdown",
"metadata": {
"id": "example-3-pardo-with-dofn-methods-2"
"source": [
"<table align=\"left\" style=\"margin-right:1em\">\n",
" <td>\n",
" <a class=\"button\" target=\"_blank\" href=\"\"><img src=\"\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n",
" </td>\n",
"> *Known issues:*\n",
"> * [[BEAM-7885]](\n",
"> `DoFn.setup()` doesn't run for streaming jobs running in the `DirectRunner`.\n",
"> * [[BEAM-7340]](\n",
"> `DoFn.teardown()` metrics are lost."
"cell_type": "markdown",
"metadata": {
"id": "related-transforms"
"source": [
"## Related transforms\n",
"* [Map]( behaves the same, but produces exactly one output for each input.\n",
"* [FlatMap]( behaves the same as `Map`,\n",
" but for each input it may produce zero or more outputs.\n",
"* [Filter]( is useful if the function is just\n",
" deciding whether to output an element or not.\n",
"<table align=\"left\" style=\"margin-right:1em\">\n",
" <td>\n",
" <a class=\"button\" target=\"_blank\" href=\"\"><img src=\"\" width=\"32px\" height=\"32px\" alt=\"Pydoc\"/> Pydoc</a>\n",
" </td>\n",
"cell_type": "markdown",
"metadata": {
"id": "view-the-docs-bottom"
"source": [
"<table align=\"left\"><td><a target=\"_blank\" href=\"\"><img src=\"\" width=\"32\" height=\"32\" />View the docs</a></td></table>"
"metadata": {
"colab": {
"name": "ParDo - element-wise transform",
"toc_visible": true
"kernelspec": {
"display_name": "python3",
"name": "python3"
"nbformat": 4,
"nbformat_minor": 4