| { |
| "nbformat": 4, |
| "nbformat_minor": 0, |
| "metadata": { |
| "colab": { |
| "name": "Getting started -- Tour of Beam", |
| "provenance": [], |
| "toc_visible": true, |
| "include_colab_link": true |
| }, |
| "kernelspec": { |
| "name": "python3", |
| "display_name": "Python 3" |
| } |
| }, |
| "cells": [ |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "view-in-github", |
| "colab_type": "text" |
| }, |
| "source": [ |
| "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/getting-started.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "metadata": { |
| "id": "SSKEd7tP-b2k", |
| "cellView": "form" |
| }, |
| "source": [ |
| "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", |
| "\n", |
| "# Licensed to the Apache Software Foundation (ASF) under one\n", |
| "# or more contributor license agreements. See the NOTICE file\n", |
| "# distributed with this work for additional information\n", |
| "# regarding copyright ownership. The ASF licenses this file\n", |
| "# to you under the Apache License, Version 2.0 (the\n", |
| "# \"License\"); you may not use this file except in compliance\n", |
| "# with the License. You may obtain a copy of the License at\n", |
| "#\n", |
| "# http://www.apache.org/licenses/LICENSE-2.0\n", |
| "#\n", |
| "# Unless required by applicable law or agreed to in writing,\n", |
| "# software distributed under the License is distributed on an\n", |
| "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", |
| "# KIND, either express or implied. See the License for the\n", |
| "# specific language governing permissions and limitations\n", |
| "# under the License." |
| ], |
| "execution_count": null, |
| "outputs": [] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "5UC_aGanx6oE" |
| }, |
| "source": [ |
| "# Getting started: _Tour of Beam_\n", |
| "\n", |
| "[Apache Beam](https://beam.apache.org/get-started/beam-overview/)\n", |
| "is a library for parallel data processing.\n", |
| "\n", |
| "Beam is commonly used for\n", |
| "[Extract-Transform-Load (ETL)](https://en.wikipedia.org/wiki/Extract,_transform,_load)\n", |
| "jobs, where we _extract_ data from a data source, _transform_ that data, and _load_ it into a data sink like a database.\n", |
| "It does particularly well with large amounts of data since it can use mutliple machines to process everything at the same time.\n", |
| "\n", |
| "Let's begin by installing the `apache-beam` package with `pip`." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "metadata": { |
| "id": "R_Yhoc6N_Flg" |
| }, |
| "source": [ |
| "# Install apache-beam with pip.\n", |
| "!pip install --quiet apache-beam" |
| ], |
| "execution_count": null, |
| "outputs": [] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "WwxLB5EiVKP_" |
| }, |
| "source": [ |
| "You can express a _data processing pipeline_, and then run it on the\n", |
| "[_runner_ of your choice](https://beam.apache.org/documentation/runners/capability-matrix/).\n", |
| "For now, we use the `DirectRunner` which runs locally for simplicity." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "8rfQeLk2y9lx" |
| }, |
| "source": [ |
| "# What is a _pipeline_?\n", |
| "\n", |
| "A __pipeline__ is a __sequence of data transformations__.\n", |
| "You can think of it like a production line,\n", |
| "data comes in from one end,\n", |
| "it gets transformed by each step.\n", |
| "The outputs from one step are passed as inputs to the next step.\n", |
| "\n", |
| "In Beam, your data lives in a __`PCollection`__,\n", |
| "which stands for _Parallel Collection_.\n", |
| "A `PCollection` is like a __list of elements__,\n", |
| "but without any order guarantees.\n", |
| "This allows Beam to easily parallelize and distribute\n", |
| "the `PCollection`'s elements." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "81VNY478gep_" |
| }, |
| "source": [ |
| "" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "LaljqKvs4UU5" |
| }, |
| "source": [ |
| "Once you have your data, the next step is to transform it.\n", |
| "In Beam, you transform data using **`PTransform`**s,\n", |
| "which stands for _Parallel Transform_.\n", |
| "A `PTransform` is like a __function__,\n", |
| "they take some inputs, transform them and create some outputs." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "Me_hvd6XgtPC" |
| }, |
| "source": [ |
| "" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "SVk4BsuKVlLj" |
| }, |
| "source": [ |
| "Now let's dive into creating our first pipeline.\n", |
| "\n", |
| "For this first pipeline, let's just feed it some data from a Python list and print the results.\n", |
| "\n", |
| "Each _step_ in the pipeline is delimited by the _pipe operator_ `|`.\n", |
| "The outputs of each transform are passed to the next transform as inputs.\n", |
| "And we can save the final results into a `PCollection` variable.\n", |
| "\n", |
| "```py\n", |
| "# We pass the elements from step1 through step3 and save the results into `outputs`.\n", |
| "outputs = pipeline | step1 | step2 | step3\n", |
| "```\n", |
| "\n", |
| "Pipelines can quickly grow long, so it's sometimes easier to read if we surround them with parentheses and break them into multiple lines.\n", |
| "\n", |
| "```py\n", |
| "# This is equivalent to the example above.\n", |
| "outputs = (\n", |
| " pipeline\n", |
| " | step1\n", |
| " | step2\n", |
| " | step3\n", |
| ")\n", |
| "```\n", |
| "\n", |
| "Also, Beam expects each transform, or step, to have a unique _label_, or description.\n", |
| "This makes it a lot easier to debug, and it's in general a good practice to start.\n", |
| "You can use the _right shift operator_ `>>` to add a label to your transforms, like `'My description' >> MyTransform`.\n", |
| "\n", |
| "```py\n", |
| "# Try to give short but descriptive labels.\n", |
| "# These serve both as comments and help debug later on.\n", |
| "outputs = (\n", |
| " pipeline\n", |
| " | 'First step' >> step1\n", |
| " | 'Second step' >> step2\n", |
| " | 'Third step' >> step3\n", |
| ")\n", |
| "```\n", |
| "\n", |
| "> ℹ️ The syntax might seem a little different at first, but you'll become familiar with it.\n", |
| "\n", |
| "We use the `Create` transform to feed the pipeline with an\n", |
| "[`iterable`](https://docs.python.org/3/glossary.html#term-iterable)\n", |
| "of elements, like a `list`.\n", |
| "\n", |
| "Let's try to see what happens if we try to `print` a PCollection." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "metadata": { |
| "id": "XAke4d5lV5f5", |
| "outputId": "eaf5954c-3a66-4ef7-a258-35797ce0bbb9", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| } |
| }, |
| "source": [ |
| "import apache_beam as beam\n", |
| "\n", |
| "inputs = [0, 1, 2, 3]\n", |
| "\n", |
| "# Create a pipeline.\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " # Feed it some input elements with `Create`.\n", |
| " outputs = (\n", |
| " pipeline\n", |
| " | 'Create initial values' >> beam.Create(inputs)\n", |
| " )\n", |
| "\n", |
| " # `outputs` is a PCollection with our input elements.\n", |
| " # But printing it directly won't show us its contents :(\n", |
| " print(f\"outputs: {outputs}\")" |
| ], |
| "execution_count": 3, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "text": [ |
| "outputs: PCollection[[3]: Create initial values/Map(decode).None]\n" |
| ], |
| "name": "stdout" |
| } |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "h0UUmpwRADqA" |
| }, |
| "source": [ |
| "> ℹ️ In Beam, you can __NOT__ access the elements from a `PCollection` directly like a Python list.\n", |
| "> This means, we can't simply `print` the output `PCollection` to see the elements.\n", |
| ">\n", |
| "> This is because, depending on the runner,\n", |
| "> the `PCollection` elements might live in multiple worker machines.\n", |
| "\n", |
| "To print the elements in the PCollection, we'll do a little trick, but we'll explain it shortly." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "metadata": { |
| "id": "IblMlbE8_4CJ", |
| "outputId": "dc825083-d157-4fae-fff9-4db33dbc0876", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| } |
| }, |
| "source": [ |
| "import apache_beam as beam\n", |
| "\n", |
| "inputs = [0, 1, 2, 3]\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " outputs = (\n", |
| " pipeline\n", |
| " | 'Create initial values' >> beam.Create(inputs)\n", |
| " )\n", |
| "\n", |
| " # We can only access the elements through another transform.\n", |
| " # Don't worry if you don't know what's happening here,\n", |
| " # we'll get to it just next :)\n", |
| " outputs | beam.Map(print)" |
| ], |
| "execution_count": 4, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "text": [ |
| "0\n", |
| "1\n", |
| "2\n", |
| "3\n" |
| ], |
| "name": "stdout" |
| } |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "FY9TP3Tw5eZt" |
| }, |
| "source": [ |
| "# Transforming data\n", |
| "\n", |
| "Apache Beam is designed with a [functional paradigm](https://en.wikipedia.org/wiki/Functional_programming).\n", |
| "This means that, instead of _loops_, it uses `PTransform`s alongside with _functions_ to process each element in a `PCollection`.\n", |
| "\n", |
| "Let's go through some of the most common and basic data transforms in Beam." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "UMKgi9LD6-xb" |
| }, |
| "source": [ |
| "## Map: _one-to-one_\n", |
| "\n", |
| "Let's say we have some elements and we want to do something with each element.\n", |
| "\n", |
| "We want to `map` a function to each element of the collection.\n", |
| "\n", |
| "`map` takes a _function_ that transforms a single input `a` into a single output `b`.\n", |
| "\n", |
| "> ℹ️ For example, we want to multiply each element by 2." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "tDtzHrSdjQx8" |
| }, |
| "source": [ |
| "" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "G0NJrjja9YRF" |
| }, |
| "source": [ |
| "In Python, this is commonly done with the\n", |
| "[built-in `map` function](https://docs.python.org/3/library/functions.html#map), or with\n", |
| "[list comprehensions](https://docs.python.org/3/tutorial/datastructures.html#list-comprehensions)." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "metadata": { |
| "id": "MHzR7CuZqcQq", |
| "outputId": "295e24fb-506e-4080-ee04-755ae533888b", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| } |
| }, |
| "source": [ |
| "inputs = [0, 1, 2, 3]\n", |
| "\n", |
| "# Using the `map` function.\n", |
| "outputs = map(lambda x: x * 2, inputs)\n", |
| "print(list(outputs))\n", |
| "\n", |
| "# Using a list comprehension.\n", |
| "outputs = [x * 2 for x in inputs]\n", |
| "print(outputs)\n", |
| "\n", |
| "# Roughly equivalent for loop.\n", |
| "outputs = []\n", |
| "for x in inputs:\n", |
| " outputs.append(x * 2)\n", |
| "print(outputs)" |
| ], |
| "execution_count": 5, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "text": [ |
| "[0, 2, 4, 6]\n", |
| "[0, 2, 4, 6]\n", |
| "[0, 2, 4, 6]\n" |
| ], |
| "name": "stdout" |
| } |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "4vaBuNFW-5FG" |
| }, |
| "source": [ |
| "In Beam, there is the\n", |
| "[`Map` transform](https://beam.apache.org/documentation/transforms/python/elementwise/map/),\n", |
| "but we must use it within a pipeline.\n", |
| "\n", |
| "First we create a pipeline and feed it our input elements.\n", |
| "Then we _pipe_ those elements into a `Map` transform where we apply our function." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "metadata": { |
| "id": "LoYq871Q96iu", |
| "outputId": "57c2155f-cebb-42ae-e175-e1554f2f806b", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| } |
| }, |
| "source": [ |
| "import apache_beam as beam\n", |
| "\n", |
| "inputs = [0, 1, 2, 3]\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " outputs = (\n", |
| " pipeline\n", |
| " | 'Create values' >> beam.Create(inputs)\n", |
| " | 'Multiply by 2' >> beam.Map(lambda x: x * 2)\n", |
| " )\n", |
| "\n", |
| " outputs | beam.Map(print)" |
| ], |
| "execution_count": null, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "text": [ |
| "0\n", |
| "2\n", |
| "4\n", |
| "6\n" |
| ], |
| "name": "stdout" |
| } |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "Q06jvwmqOzer" |
| }, |
| "source": [ |
| "> ℹ️ Now that we know how `Map` works, we can see what's happening when we print the elements.\n", |
| ">\n", |
| "> We have our outputs stored in the `outputs` `PCollection`, so we _pipe_ it to a `Map` transform to apply the\n", |
| "> [`print`](https://docs.python.org/3/library/functions.html#print)\n", |
| "> function.\n", |
| ">\n", |
| "> Note that `print` returns `None`, so we get an output `PCollection` of all `None` elements.\n", |
| "> But we are not saving its results to any variable,\n", |
| "> so they get discarded.\n", |
| ">\n", |
| "> This does _not_ affect the values in `outputs` in any way." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "m8AVwzVyCAPD" |
| }, |
| "source": [ |
| "## FlatMap: _one-to-many_\n", |
| "\n", |
| "`Map` allows us to transform each individual element,\n", |
| "but we can't change the number of elements with it.\n", |
| "\n", |
| "We want to `map` a function to each element of a collection.\n", |
| "That function returns a _list of output elements_,\n", |
| "so we would get a _list of lists of elements_.\n", |
| "Then we want to _flatten_ the _list of lists_ into a single _list_.\n", |
| "\n", |
| "`flatMap` takes a function that transforms a single input `a` into an `iterable` of outputs `b`.\n", |
| "But we get a _single collection_ containing the outputs of _all_ the elements.\n", |
| "\n", |
| "> ℹ️ For example, we want to have as many elements as the element's value.\n", |
| "> For a value `1` we want one element, and three elements for a value `3`." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "t3KpC-tej1CS" |
| }, |
| "source": [ |
| "" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "QdQEEbtNKpQJ" |
| }, |
| "source": [ |
| "In Python this could be done with a _nested list comprehension_,\n", |
| "but it's a little tricky to read." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "metadata": { |
| "id": "0xqSUP5mKNqI", |
| "outputId": "58251379-ed6f-4452-e9ea-60e59394ee5a", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| } |
| }, |
| "source": [ |
| "inputs = [0, 1, 2, 3]\n", |
| "\n", |
| "# Using a list comprehensions.\n", |
| "mapOutputs = [[x for _ in range(x)] for x in inputs]\n", |
| "# After the map function, flatten the results.\n", |
| "outputs = [x for xs in mapOutputs for x in xs]\n", |
| "print(outputs)\n", |
| "\n", |
| "# Roughly equivalent for loop.\n", |
| "outputs = []\n", |
| "for x in inputs:\n", |
| " outputs += [x for _ in range(x)]\n", |
| "print(outputs)" |
| ], |
| "execution_count": null, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "text": [ |
| "[1, 2, 2, 3, 3, 3]\n", |
| "[1, 2, 2, 3, 3, 3]\n" |
| ], |
| "name": "stdout" |
| } |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "OP3bSQoYMOH6" |
| }, |
| "source": [ |
| "The good news is that Beam already has a\n", |
| "[`FlatMap` transform](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap/)\n", |
| "built-in, so it's actually easier than plain Python.\n", |
| "\n", |
| "`FlatMap` accepts a function that takes a single input element and outputs an `iterable` of elements." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "metadata": { |
| "id": "gZbsyAWaMT-F", |
| "outputId": "f2623c5c-d709-4229-c293-952b9bbcbef4", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| } |
| }, |
| "source": [ |
| "import apache_beam as beam\n", |
| "\n", |
| "inputs = [0, 1, 2, 3]\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " outputs = (\n", |
| " pipeline\n", |
| " | 'Create values' >> beam.Create(inputs)\n", |
| " | 'Expand elements' >> beam.FlatMap(lambda x: [x for _ in range(x)])\n", |
| " )\n", |
| "\n", |
| " outputs | beam.Map(print)" |
| ], |
| "execution_count": null, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "text": [ |
| "1\n", |
| "2\n", |
| "2\n", |
| "3\n", |
| "3\n", |
| "3\n" |
| ], |
| "name": "stdout" |
| } |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "YHcWp18cMefc" |
| }, |
| "source": [ |
| "> ℹ️ Try replacing the `FlatMap` transform with `Map` to see how they behave differently." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "ad-O7lUxOoyD" |
| }, |
| "source": [ |
| "## Filter: _one-to-zero_\n", |
| "\n", |
| "Sometimes we want to *only* process certain elements while ignoring others.\n", |
| "\n", |
| "We want to `filter` each element in a collection using a function.\n", |
| "\n", |
| "`filter` takes a function that checks a single element `a`,\n", |
| "and returns `True` to keep the element, or `False` to discard it.\n", |
| "\n", |
| "> ℹ️ For example, we only want to keep number that are *even*, or divisible by two.\n", |
| "> We can use the\n", |
| "> [modulo operator `%`](https://en.wikipedia.org/wiki/Modulo_operation)\n", |
| "> for a simple check." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "CUVXfvixkckY" |
| }, |
| "source": [ |
| "" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "Y-gWtFNockwq" |
| }, |
| "source": [ |
| "In Python we can do this with *list comprehensions* as well." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "metadata": { |
| "id": "4xXSy5Bga9td", |
| "outputId": "90d5d9a7-d925-4b11-91c8-e6e844f9d21e", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| } |
| }, |
| "source": [ |
| "inputs = [0, 1, 2, 3]\n", |
| "\n", |
| "# Using a list comprehensions.\n", |
| "outputs = [x for x in inputs if x % 2 == 0]\n", |
| "print(outputs)\n", |
| "\n", |
| "# Roughly equivalent for loop.\n", |
| "outputs = []\n", |
| "for x in inputs:\n", |
| " if x % 2 == 0:\n", |
| " outputs.append(x)\n", |
| "print(outputs)" |
| ], |
| "execution_count": null, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "text": [ |
| "[0, 2]\n", |
| "[0, 2]\n" |
| ], |
| "name": "stdout" |
| } |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "mtVSijNAc0Fi" |
| }, |
| "source": [ |
| "In Beam, there is the\n", |
| "[`Filter` transform](https://beam.apache.org/documentation/transforms/python/elementwise/filter/)." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "metadata": { |
| "id": "l7W9UafLkiO1", |
| "outputId": "a59e0cb0-3cb9-43a6-8043-5789380d938c", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| } |
| }, |
| "source": [ |
| "import apache_beam as beam\n", |
| "\n", |
| "inputs = [0, 1, 2, 3]\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " outputs = (\n", |
| " pipeline\n", |
| " | 'Create values' >> beam.Create(inputs)\n", |
| " | 'Keep only even numbers' >> beam.Filter(lambda x: x % 2 == 0)\n", |
| " )\n", |
| "\n", |
| " outputs | beam.Map(print)" |
| ], |
| "execution_count": null, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "text": [ |
| "0\n", |
| "2\n" |
| ], |
| "name": "stdout" |
| } |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "_yYNTszgktLY" |
| }, |
| "source": [ |
| "## Combine: _many-to-one_\n", |
| "\n", |
| "We also need a way to get a single value from an entire `PCollection`.\n", |
| "We might want to get the total number of elements, or the average value, or any other type of _aggregation_ of values.\n", |
| "\n", |
| "We want to `combine` the elements in a collection into a single output.\n", |
| "\n", |
| "`combine` takes a function that transforms an `iterable` of inputs `a`, and returns a single output `a`.\n", |
| "\n", |
| "Other common names for this function are `fold` and `reduce`.\n", |
| "\n", |
| "> ℹ️ For example, we want to add all numbers together." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "Too-Ru1xGj7e" |
| }, |
| "source": [ |
| "" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "F60IfKPOmlkg" |
| }, |
| "source": [ |
| "In Python this is usually achieved with the\n", |
| "[`reduce` function](https://docs.python.org/3/library/functools.html#functools.reduce)." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "metadata": { |
| "id": "4qjUZvT2k878", |
| "outputId": "1868d167-529e-4e5f-ec27-3ba405226a1d", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| } |
| }, |
| "source": [ |
| "from functools import reduce\n", |
| "\n", |
| "inputs = [0, 1, 2, 3]\n", |
| "\n", |
| "# Using reduce (most general way).\n", |
| "output = reduce(lambda x, y: x + y, inputs, 0)\n", |
| "print(output)\n", |
| "\n", |
| "# Using the built-in sum function, which is itself a \"reduce\" function.\n", |
| "output = sum(inputs)\n", |
| "print(output)\n", |
| "\n", |
| "# Roughly equivalent for loop.\n", |
| "output = 0\n", |
| "for x in inputs:\n", |
| " y = output\n", |
| " output = x + y\n", |
| "print(output)" |
| ], |
| "execution_count": null, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "text": [ |
| "6\n", |
| "6\n", |
| "6\n" |
| ], |
| "name": "stdout" |
| } |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "kotSZnlZoO8D" |
| }, |
| "source": [ |
| "In Beam, there are\n", |
| "[aggregation transforms](https://beam.apache.org/documentation/transforms/python/overview/#aggregation).\n", |
| "\n", |
| "For this particular example, we can use the\n", |
| "[`CombineGlobally` transform](https://beam.apache.org/documentation/transforms/python/aggregation/sum/)\n", |
| "which accepts a function that takes an iterable of elements as an input and outputs a single value.\n", |
| "\n", |
| "We can pass the\n", |
| "[built-in function `sum`](https://docs.python.org/3/library/functions.html#sum)\n", |
| "into `CombineGlobally`." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "metadata": { |
| "id": "PoRd7hlnoOu5", |
| "outputId": "b34f987f-ddb3-4808-95eb-9ac6140d2ea2", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| } |
| }, |
| "source": [ |
| "import apache_beam as beam\n", |
| "\n", |
| "inputs = [0, 1, 2, 3]\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " outputs = (\n", |
| " pipeline\n", |
| " | 'Create values' >> beam.Create(inputs)\n", |
| " | 'Sum all values together' >> beam.CombineGlobally(sum)\n", |
| " )\n", |
| "\n", |
| " outputs | beam.Map(print)" |
| ], |
| "execution_count": null, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "text": [ |
| "6\n" |
| ], |
| "name": "stdout" |
| } |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "pFb98ioSp9YU" |
| }, |
| "source": [ |
| "> ℹ️ There are many ways to combine values in Beam.\n", |
| "> You could even combine them into a different data type by defining a custom `CombineFn`.\n", |
| ">\n", |
| "> You can learn more about them by checking the available\n", |
| "> [aggregation transforms](https://beam.apache.org/documentation/transforms/python/overview/#aggregation)." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "cOnJoaE3qmMv" |
| }, |
| "source": [ |
| "## GroupByKey: _group related elements_\n", |
| "\n", |
| "Sometimes it's useful to pair each element with a *key* that we can use to group related elements together.\n", |
| "\n", |
| "Think of it as creating a\n", |
| "[Python `dict`](https://docs.python.org/3/tutorial/datastructures.html#dictionaries)\n", |
| "from a list of `(key, value)` pairs,\n", |
| "but instead of replacing the value on a \"duplicate\" key,\n", |
| "you would get a list of all the values associated with that key.\n", |
| "\n", |
| "> ℹ️ For example, we want to group each animal with the list of foods they like, and we start with `(animal, food)` pairs." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "IX9VmTH7xluM" |
| }, |
| "source": [ |
| "" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "Sic8de3btYli" |
| }, |
| "source": [ |
| "There's no built-in function for `groupByKey` in plain Python,\n", |
| "but here's a simple implementation." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "metadata": { |
| "id": "EzJPZJCWtWF9", |
| "outputId": "cd972c46-7c05-49ab-a57d-e6e655349544", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| } |
| }, |
| "source": [ |
| "from functools import reduce\n", |
| "\n", |
| "inputs = [\n", |
| " ('🐹', '🌽'),\n", |
| " ('🐼', '🎋'),\n", |
| " ('🐰', '🥕'),\n", |
| " ('🐹', '🌰'),\n", |
| " ('🐰', '🥒'),\n", |
| "]\n", |
| "\n", |
| "\n", |
| "# Since we're getting a single dict from all the elements,\n", |
| "# we can use reduce for this.\n", |
| "def groupByKey(result, keyValue):\n", |
| " key, value = keyValue\n", |
| " values = result.get(key, []) + [value]\n", |
| " return {**result, key: values}\n", |
| "output = reduce(groupByKey, inputs, {})\n", |
| "print(output)\n", |
| "\n", |
| "\n", |
| "# Roughly equivalent for loop.\n", |
| "output = {}\n", |
| "for key, value in inputs:\n", |
| " values = output.get(key, []) + [value]\n", |
| " output = {**output, key: values}\n", |
| "print(output)" |
| ], |
| "execution_count": null, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "text": [ |
| "{'🐹': ['🌽', '🌰'], '🐼': ['🎋'], '🐰': ['🥕', '🥒']}\n", |
| "{'🐹': ['🌽', '🌰'], '🐼': ['🎋'], '🐰': ['🥕', '🥒']}\n" |
| ], |
| "name": "stdout" |
| } |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "1EmT-9Zawa_q" |
| }, |
| "source": [ |
| "In Beam, there is the\n", |
| "[`GroupByKey` transform](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey/)." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "metadata": { |
| "id": "67fZUiWiwnAf", |
| "outputId": "f95c455c-c554-41de-bc8c-e755584064d2", |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| } |
| }, |
| "source": [ |
| "import apache_beam as beam\n", |
| "\n", |
| "inputs = [\n", |
| " ('🐹', '🌽'),\n", |
| " ('🐼', '🎋'),\n", |
| " ('🐰', '🥕'),\n", |
| " ('🐹', '🌰'),\n", |
| " ('🐰', '🥒'),\n", |
| "]\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " outputs = (\n", |
| " pipeline\n", |
| " | 'Create (animal, food) pairs' >> beam.Create(inputs)\n", |
| " | 'Group foods by animals' >> beam.GroupByKey()\n", |
| " )\n", |
| "\n", |
| " outputs | beam.Map(print)" |
| ], |
| "execution_count": null, |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "text": [ |
| "('🐹', ['🌽', '🌰'])\n", |
| "('🐼', ['🎋'])\n", |
| "('🐰', ['🥕', '🥒'])\n" |
| ], |
| "name": "stdout" |
| } |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "gnoz_mWtxSjW" |
| }, |
| "source": [ |
| "# What's next?\n", |
| "\n", |
| "* \n", |
| " [Reading and writing data](https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb) --\n", |
| " how to read and write data to and from different data formats. \n", |
| "* [Transform catalog](https://beam.apache.org/documentation/transforms/python/overview) --\n", |
| " check out all the available transforms.\n", |
| "* [Mobile gaming example](https://beam.apache.org/get-started/mobile-gaming-example) --\n", |
| " learn more about windowing, triggers, and streaming through a complete example pipeline.\n", |
| "* [Runners](https://beam.apache.org/documentation/runners/capability-matrix) --\n", |
| " check the available runners, their capabilities, and how to run your pipeline in them." |
| ] |
| } |
| ] |
| } |