blob: e5fedb15236cfccce38cc8b11357fe9d328f2d54 [file] [log] [blame]
"cells": [
"cell_type": "markdown",
"metadata": {
"id": "view-in-github"
"source": [
"<a href=\"\" target=\"_parent\"><img src=\"\" alt=\"Open in Colab\"/></a>"
"cell_type": "markdown",
"metadata": {
"id": "view-the-docs-top"
"source": [
"<table align=\"left\"><td><a target=\"_blank\" href=\"\"><img src=\"\" width=\"32\" height=\"32\" />View the docs</a></td></table>"
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"id": "_-code"
"outputs": [],
"source": [
"#@title Licensed under the Apache License, Version 2.0 (the \"License\")\n",
"# Licensed to the Apache Software Foundation (ASF) under one\n",
"# or more contributor license agreements. See the NOTICE file\n",
"# distributed with this work for additional information\n",
"# regarding copyright ownership. The ASF licenses this file\n",
"# to you under the Apache License, Version 2.0 (the\n",
"# \"License\"); you may not use this file except in compliance\n",
"# with the License. You may obtain a copy of the License at\n",
"# Unless required by applicable law or agreed to in writing,\n",
"# software distributed under the License is distributed on an\n",
"# KIND, either express or implied. See the License for the\n",
"# specific language governing permissions and limitations\n",
"# under the License."
"cell_type": "markdown",
"metadata": {
"id": "flatmap"
"source": [
"# FlatMap\n",
"<script type=\"text/javascript\">\n",
"localStorage.setItem('language', 'language-py')\n",
"<table align=\"left\" style=\"margin-right:1em\">\n",
" <td>\n",
" <a class=\"button\" target=\"_blank\" href=\"\"><img src=\"\" width=\"32px\" height=\"32px\" alt=\"Pydoc\"/> Pydoc</a>\n",
" </td>\n",
"Applies a simple 1-to-many mapping function over each element in the collection.\n",
"The many elements are flattened into the resulting collection."
"cell_type": "markdown",
"metadata": {
"id": "setup"
"source": [
"## Setup\n",
"To run a code cell, you can click the **Run cell** button at the top left of the cell,\n",
"or select it and press **`Shift+Enter`**.\n",
"Try modifying a code cell and re-running it to see what happens.\n",
"> To learn more about Colab, see\n",
"> [Welcome to Colaboratory!](\n",
"First, let's install the `apache-beam` module."
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "setup-code"
"outputs": [],
"source": [
"!pip install --quiet -U apache-beam"
"cell_type": "markdown",
"metadata": {
"id": "examples"
"source": [
"## Examples\n",
"In the following examples, we create a pipeline with a `PCollection` of produce with their icon, name, and duration.\n",
"Then, we apply `FlatMap` in multiple ways to yield zero or more elements per each input element into the resulting `PCollection`.\n",
"`FlatMap` accepts a function that returns an `iterable`,\n",
"where each of the output `iterable`'s elements is an element of the resulting `PCollection`."
"cell_type": "markdown",
"metadata": {
"id": "example-1-flatmap-with-a-predefined-function"
"source": [
"### Example 1: FlatMap with a predefined function\n",
"We use the function `str.split` which takes a single `str` element and outputs a `list` of `str`s.\n",
"This pipeline splits the input element using whitespaces, creating a list of zero or more elements."
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "example-1-flatmap-with-a-predefined-function-code"
"outputs": [],
"source": [
"import apache_beam as beam\n",
"with beam.Pipeline() as pipeline:\n",
" plants = (\n",
" pipeline\n",
" | 'Gardening plants' >> beam.Create([\n",
" '🍓Strawberry 🥕Carrot 🍆Eggplant',\n",
" '🍅Tomato 🥔Potato',\n",
" ])\n",
" | 'Split words' >> beam.FlatMap(str.split)\n",
" | beam.Map(print))"
"cell_type": "markdown",
"metadata": {
"id": "example-1-flatmap-with-a-predefined-function-2"
"source": [
"<table align=\"left\" style=\"margin-right:1em\">\n",
" <td>\n",
" <a class=\"button\" target=\"_blank\" href=\"\"><img src=\"\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n",
" </td>\n",
"cell_type": "markdown",
"metadata": {
"id": "example-2-flatmap-with-a-function"
"source": [
"### Example 2: FlatMap with a function\n",
"We define a function `split_words` which splits an input `str` element using the delimiter `','` and outputs a `list` of `str`s."
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "example-2-flatmap-with-a-function-code"
"outputs": [],
"source": [
"import apache_beam as beam\n",
"def split_words(text):\n",
" return text.split(',')\n",
"with beam.Pipeline() as pipeline:\n",
" plants = (\n",
" pipeline\n",
" | 'Gardening plants' >> beam.Create([\n",
" '🍓Strawberry,🥕Carrot,🍆Eggplant',\n",
" '🍅Tomato,🥔Potato',\n",
" ])\n",
" | 'Split words' >> beam.FlatMap(split_words)\n",
" | beam.Map(print))"
"cell_type": "markdown",
"metadata": {
"id": "example-2-flatmap-with-a-function-2"
"source": [
"<table align=\"left\" style=\"margin-right:1em\">\n",
" <td>\n",
" <a class=\"button\" target=\"_blank\" href=\"\"><img src=\"\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n",
" </td>\n",
"cell_type": "markdown",
"metadata": {
"id": "example-3-flatmap-with-a-lambda-function"
"source": [
"### Example 3: FlatMap with a lambda function\n",
"For this example, we want to flatten a `PCollection` of lists of `str`s into a `PCollection` of `str`s.\n",
"Each input element is already an `iterable`, where each element is what we want in the resulting `PCollection`.\n",
"We use a lambda function that returns the same input element it received."
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "example-3-flatmap-with-a-lambda-function-code"
"outputs": [],
"source": [
"import apache_beam as beam\n",
"with beam.Pipeline() as pipeline:\n",
" plants = (\n",
" pipeline\n",
" | 'Gardening plants' >> beam.Create([\n",
" ['🍓Strawberry', '🥕Carrot', '🍆Eggplant'],\n",
" ['🍅Tomato', '🥔Potato'],\n",
" ])\n",
" | 'Flatten lists' >> beam.FlatMap(lambda elements: elements)\n",
" | beam.Map(print))"
"cell_type": "markdown",
"metadata": {
"id": "example-3-flatmap-with-a-lambda-function-2"
"source": [
"<table align=\"left\" style=\"margin-right:1em\">\n",
" <td>\n",
" <a class=\"button\" target=\"_blank\" href=\"\"><img src=\"\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n",
" </td>\n",
"cell_type": "markdown",
"metadata": {
"id": "example-4-flatmap-with-a-generator"
"source": [
"### Example 4: FlatMap with a generator\n",
"For this example, we want to flatten a `PCollection` of lists of `str`s into a `PCollection` of `str`s.\n",
"We use a generator to iterate over the input list and yield each of the elements.\n",
"Each yielded result in the generator is an element in the resulting `PCollection`."
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "example-4-flatmap-with-a-generator-code"
"outputs": [],
"source": [
"import apache_beam as beam\n",
"def generate_elements(elements):\n",
" for element in elements:\n",
" yield element\n",
"with beam.Pipeline() as pipeline:\n",
" plants = (\n",
" pipeline\n",
" | 'Gardening plants' >> beam.Create([\n",
" ['🍓Strawberry', '🥕Carrot', '🍆Eggplant'],\n",
" ['🍅Tomato', '🥔Potato'],\n",
" ])\n",
" | 'Flatten lists' >> beam.FlatMap(generate_elements)\n",
" | beam.Map(print))"
"cell_type": "markdown",
"metadata": {
"id": "example-4-flatmap-with-a-generator-2"
"source": [
"<table align=\"left\" style=\"margin-right:1em\">\n",
" <td>\n",
" <a class=\"button\" target=\"_blank\" href=\"\"><img src=\"\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n",
" </td>\n",
"cell_type": "markdown",
"metadata": {
"id": "example-5-flatmaptuple-for-key-value-pairs"
"source": [
"### Example 5: FlatMapTuple for key-value pairs\n",
"If your `PCollection` consists of `(key, value)` pairs,\n",
"you can use `FlatMapTuple` to unpack them into different function arguments."
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "example-5-flatmaptuple-for-key-value-pairs-code"
"outputs": [],
"source": [
"import apache_beam as beam\n",
"def format_plant(icon, plant):\n",
" if icon:\n",
" yield '{}{}'.format(icon, plant)\n",
"with beam.Pipeline() as pipeline:\n",
" plants = (\n",
" pipeline\n",
" | 'Gardening plants' >> beam.Create([\n",
" ('🍓', 'Strawberry'),\n",
" ('🥕', 'Carrot'),\n",
" ('🍆', 'Eggplant'),\n",
" ('🍅', 'Tomato'),\n",
" ('🥔', 'Potato'),\n",
" (None, 'Invalid'),\n",
" ])\n",
" | 'Format' >> beam.FlatMapTuple(format_plant)\n",
" | beam.Map(print))"
"cell_type": "markdown",
"metadata": {
"id": "example-5-flatmaptuple-for-key-value-pairs-2"
"source": [
"<table align=\"left\" style=\"margin-right:1em\">\n",
" <td>\n",
" <a class=\"button\" target=\"_blank\" href=\"\"><img src=\"\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n",
" </td>\n",
"cell_type": "markdown",
"metadata": {
"id": "example-6-flatmap-with-multiple-arguments"
"source": [
"### Example 6: FlatMap with multiple arguments\n",
"You can pass functions with multiple arguments to `FlatMap`.\n",
"They are passed as additional positional arguments or keyword arguments to the function.\n",
"In this example, `split_words` takes `text` and `delimiter` as arguments."
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "example-6-flatmap-with-multiple-arguments-code"
"outputs": [],
"source": [
"import apache_beam as beam\n",
"def split_words(text, delimiter=None):\n",
" return text.split(delimiter)\n",
"with beam.Pipeline() as pipeline:\n",
" plants = (\n",
" pipeline\n",
" | 'Gardening plants' >> beam.Create([\n",
" '🍓Strawberry,🥕Carrot,🍆Eggplant',\n",
" '🍅Tomato,🥔Potato',\n",
" ])\n",
" | 'Split words' >> beam.FlatMap(split_words, delimiter=',')\n",
" | beam.Map(print))"
"cell_type": "markdown",
"metadata": {
"id": "example-6-flatmap-with-multiple-arguments-2"
"source": [
"<table align=\"left\" style=\"margin-right:1em\">\n",
" <td>\n",
" <a class=\"button\" target=\"_blank\" href=\"\"><img src=\"\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n",
" </td>\n",
"cell_type": "markdown",
"metadata": {
"id": "example-7-flatmap-with-side-inputs-as-singletons"
"source": [
"### Example 7: FlatMap with side inputs as singletons\n",
"If the `PCollection` has a single value, such as the average from another computation,\n",
"passing the `PCollection` as a *singleton* accesses that value.\n",
"In this example, we pass a `PCollection` the value `','` as a singleton.\n",
"We then use that value as the delimiter for the `str.split` method."
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "example-7-flatmap-with-side-inputs-as-singletons-code"
"outputs": [],
"source": [
"import apache_beam as beam\n",
"with beam.Pipeline() as pipeline:\n",
" delimiter = pipeline | 'Create delimiter' >> beam.Create([','])\n",
" plants = (\n",
" pipeline\n",
" | 'Gardening plants' >> beam.Create([\n",
" '🍓Strawberry,🥕Carrot,🍆Eggplant',\n",
" '🍅Tomato,🥔Potato',\n",
" ])\n",
" | 'Split words' >> beam.FlatMap(\n",
" lambda text,\n",
" delimiter: text.split(delimiter),\n",
" delimiter=beam.pvalue.AsSingleton(delimiter),\n",
" )\n",
" | beam.Map(print))"
"cell_type": "markdown",
"metadata": {
"id": "example-7-flatmap-with-side-inputs-as-singletons-2"
"source": [
"<table align=\"left\" style=\"margin-right:1em\">\n",
" <td>\n",
" <a class=\"button\" target=\"_blank\" href=\"\"><img src=\"\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n",
" </td>\n",
"cell_type": "markdown",
"metadata": {
"id": "example-8-flatmap-with-side-inputs-as-iterators"
"source": [
"### Example 8: FlatMap with side inputs as iterators\n",
"If the `PCollection` has multiple values, pass the `PCollection` as an *iterator*.\n",
"This accesses elements lazily as they are needed,\n",
"so it is possible to iterate over large `PCollection`s that won't fit into memory."
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "example-8-flatmap-with-side-inputs-as-iterators-code"
"outputs": [],
"source": [
"import apache_beam as beam\n",
"def normalize_and_validate_durations(plant, valid_durations):\n",
" plant['duration'] = plant['duration'].lower()\n",
" if plant['duration'] in valid_durations:\n",
" yield plant\n",
"with beam.Pipeline() as pipeline:\n",
" valid_durations = pipeline | 'Valid durations' >> beam.Create([\n",
" 'annual',\n",
" 'biennial',\n",
" 'perennial',\n",
" ])\n",
" valid_plants = (\n",
" pipeline\n",
" | 'Gardening plants' >> beam.Create([\n",
" {\n",
" 'icon': '🍓', 'name': 'Strawberry', 'duration': 'Perennial'\n",
" },\n",
" {\n",
" 'icon': '🥕', 'name': 'Carrot', 'duration': 'BIENNIAL'\n",
" },\n",
" {\n",
" 'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'\n",
" },\n",
" {\n",
" 'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'\n",
" },\n",
" {\n",
" 'icon': '🥔', 'name': 'Potato', 'duration': 'unknown'\n",
" },\n",
" ])\n",
" | 'Normalize and validate durations' >> beam.FlatMap(\n",
" normalize_and_validate_durations,\n",
" valid_durations=beam.pvalue.AsIter(valid_durations),\n",
" )\n",
" | beam.Map(print))"
"cell_type": "markdown",
"metadata": {
"id": "example-8-flatmap-with-side-inputs-as-iterators-2"
"source": [
"<table align=\"left\" style=\"margin-right:1em\">\n",
" <td>\n",
" <a class=\"button\" target=\"_blank\" href=\"\"><img src=\"\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n",
" </td>\n",
"> **Note**: You can pass the `PCollection` as a *list* with `beam.pvalue.AsList(pcollection)`,\n",
"> but this requires that all the elements fit into memory."
"cell_type": "markdown",
"metadata": {
"id": "example-9-flatmap-with-side-inputs-as-dictionaries"
"source": [
"### Example 9: FlatMap with side inputs as dictionaries\n",
"If a `PCollection` is small enough to fit into memory, then that `PCollection` can be passed as a *dictionary*.\n",
"Each element must be a `(key, value)` pair.\n",
"Note that all the elements of the `PCollection` must fit into memory for this.\n",
"If the `PCollection` won't fit into memory, use `beam.pvalue.AsIter(pcollection)` instead."
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "example-9-flatmap-with-side-inputs-as-dictionaries-code"
"outputs": [],
"source": [
"import apache_beam as beam\n",
"def replace_duration_if_valid(plant, durations):\n",
" if plant['duration'] in durations:\n",
" plant['duration'] = durations[plant['duration']]\n",
" yield plant\n",
"with beam.Pipeline() as pipeline:\n",
" durations = pipeline | 'Durations dict' >> beam.Create([\n",
" (0, 'annual'),\n",
" (1, 'biennial'),\n",
" (2, 'perennial'),\n",
" ])\n",
" valid_plants = (\n",
" pipeline\n",
" | 'Gardening plants' >> beam.Create([\n",
" {\n",
" 'icon': '🍓', 'name': 'Strawberry', 'duration': 2\n",
" },\n",
" {\n",
" 'icon': '🥕', 'name': 'Carrot', 'duration': 1\n",
" },\n",
" {\n",
" 'icon': '🍆', 'name': 'Eggplant', 'duration': 2\n",
" },\n",
" {\n",
" 'icon': '🍅', 'name': 'Tomato', 'duration': 0\n",
" },\n",
" {\n",
" 'icon': '🥔', 'name': 'Potato', 'duration': -1\n",
" },\n",
" ])\n",
" | 'Replace duration if valid' >> beam.FlatMap(\n",
" replace_duration_if_valid,\n",
" durations=beam.pvalue.AsDict(durations),\n",
" )\n",
" | beam.Map(print))"
"cell_type": "markdown",
"metadata": {
"id": "example-9-flatmap-with-side-inputs-as-dictionaries-2"
"source": [
"<table align=\"left\" style=\"margin-right:1em\">\n",
" <td>\n",
" <a class=\"button\" target=\"_blank\" href=\"\"><img src=\"\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n",
" </td>\n",
"cell_type": "markdown",
"metadata": {
"id": "related-transforms"
"source": [
"## Related transforms\n",
"* [Filter]( is useful if the function is just\n",
" deciding whether to output an element or not.\n",
"* [ParDo]( is the most general elementwise mapping\n",
" operation, and includes other abilities such as multiple output collections and side-inputs.\n",
"* [Map]( behaves the same, but produces exactly one output for each input.\n",
"<table align=\"left\" style=\"margin-right:1em\">\n",
" <td>\n",
" <a class=\"button\" target=\"_blank\" href=\"\"><img src=\"\" width=\"32px\" height=\"32px\" alt=\"Pydoc\"/> Pydoc</a>\n",
" </td>\n",
"cell_type": "markdown",
"metadata": {
"id": "view-the-docs-bottom"
"source": [
"<table align=\"left\"><td><a target=\"_blank\" href=\"\"><img src=\"\" width=\"32\" height=\"32\" />View the docs</a></td></table>"
"metadata": {
"colab": {
"name": "FlatMap - element-wise transform",
"toc_visible": true
"kernelspec": {
"display_name": "python3",
"name": "python3"
"nbformat": 4,
"nbformat_minor": 4