| { |
| "cells": [ |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "view-in-github" |
| }, |
| "source": [ |
| "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master//Users/dcavazos/src/beam/examples/notebooks/documentation/transforms/python/elementwise/partition-py.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open in Colab\"/></a>" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "view-the-docs-top" |
| }, |
| "source": [ |
| "<table align=\"left\"><td><a target=\"_blank\" href=\"https://beam.apache.org/documentation/transforms/python/elementwise/partition\"><img src=\"https://beam.apache.org/images/logos/full-color/name-bottom/beam-logo-full-color-name-bottom-100.png\" width=\"32\" height=\"32\" />View the docs</a></td></table>" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "cellView": "form", |
| "id": "_-code" |
| }, |
| "outputs": [], |
| "source": [ |
| "#@title Licensed under the Apache License, Version 2.0 (the \"License\")\n", |
| "# Licensed to the Apache Software Foundation (ASF) under one\n", |
| "# or more contributor license agreements. See the NOTICE file\n", |
| "# distributed with this work for additional information\n", |
| "# regarding copyright ownership. The ASF licenses this file\n", |
| "# to you under the Apache License, Version 2.0 (the\n", |
| "# \"License\"); you may not use this file except in compliance\n", |
| "# with the License. You may obtain a copy of the License at\n", |
| "#\n", |
| "# http://www.apache.org/licenses/LICENSE-2.0\n", |
| "#\n", |
| "# Unless required by applicable law or agreed to in writing,\n", |
| "# software distributed under the License is distributed on an\n", |
| "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", |
| "# KIND, either express or implied. See the License for the\n", |
| "# specific language governing permissions and limitations\n", |
| "# under the License." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "partition" |
| }, |
| "source": [ |
| "# Partition\n", |
| "\n", |
| "<script type=\"text/javascript\">\n", |
| "localStorage.setItem('language', 'language-py')\n", |
| "</script>\n", |
| "\n", |
| "<table align=\"left\" style=\"margin-right:1em\">\n", |
| " <td>\n", |
| " <a class=\"button\" target=\"_blank\" href=\"https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.Partition\"><img src=\"https://beam.apache.org/images/logos/sdks/python.png\" width=\"32px\" height=\"32px\" alt=\"Pydoc\"/> Pydoc</a>\n", |
| " </td>\n", |
| "</table>\n", |
| "\n", |
| "<br/><br/><br/>\n", |
| "\n", |
| "Separates elements in a collection into multiple output\n", |
| "collections. The partitioning function contains the logic that determines how\n", |
| "to separate the elements of the input collection into each resulting\n", |
| "partition output collection.\n", |
| "\n", |
| "The number of partitions must be determined at graph construction time.\n", |
| "You cannot determine the number of partitions in mid-pipeline\n", |
| "\n", |
| "See more information in the [Beam Programming Guide](https://beam.apache.org/documentation/programming-guide/#partition)." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "setup" |
| }, |
| "source": [ |
| "## Setup\n", |
| "\n", |
| "To run a code cell, you can click the **Run cell** button at the top left of the cell,\n", |
| "or select it and press **`Shift+Enter`**.\n", |
| "Try modifying a code cell and re-running it to see what happens.\n", |
| "\n", |
| "> To learn more about Colab, see\n", |
| "> [Welcome to Colaboratory!](https://colab.sandbox.google.com/notebooks/welcome.ipynb).\n", |
| "\n", |
| "First, let's install the `apache-beam` module." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "setup-code" |
| }, |
| "outputs": [], |
| "source": [ |
| "!pip install --quiet -U apache-beam" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "examples" |
| }, |
| "source": [ |
| "## Examples\n", |
| "\n", |
| "In the following examples, we create a pipeline with a `PCollection` of produce with their icon, name, and duration.\n", |
| "Then, we apply `Partition` in multiple ways to split the `PCollection` into multiple `PCollections`.\n", |
| "\n", |
| "`Partition` accepts a function that receives the number of partitions,\n", |
| "and returns the index of the desired partition for the element.\n", |
| "The number of partitions passed must be a positive integer,\n", |
| "and it must return an integer in the range `0` to `num_partitions-1`." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "example-1-partition-with-a-function" |
| }, |
| "source": [ |
| "### Example 1: Partition with a function\n", |
| "\n", |
| "In the following example, we have a known list of durations.\n", |
| "We partition the `PCollection` into one `PCollection` for every duration type." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "example-1-partition-with-a-function-code" |
| }, |
| "outputs": [], |
| "source": [ |
| "import apache_beam as beam\n", |
| "\n", |
| "durations = ['annual', 'biennial', 'perennial']\n", |
| "\n", |
| "def by_duration(plant, num_partitions):\n", |
| " return durations.index(plant['duration'])\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " annuals, biennials, perennials = (\n", |
| " pipeline\n", |
| " | 'Gardening plants' >> beam.Create([\n", |
| " {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},\n", |
| " {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},\n", |
| " {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},\n", |
| " {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},\n", |
| " {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},\n", |
| " ])\n", |
| " | 'Partition' >> beam.Partition(by_duration, len(durations))\n", |
| " )\n", |
| " _ = (\n", |
| " annuals\n", |
| " | 'Annuals' >> beam.Map(lambda x: print('annual: ' + str(x)))\n", |
| " )\n", |
| " _ = (\n", |
| " biennials\n", |
| " | 'Biennials' >> beam.Map(lambda x: print('biennial: ' + str(x)))\n", |
| " )\n", |
| " _ = (\n", |
| " perennials\n", |
| " | 'Perennials' >> beam.Map(lambda x: print('perennial: ' + str(x)))\n", |
| " )" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "example-1-partition-with-a-function-2" |
| }, |
| "source": [ |
| "<table align=\"left\" style=\"margin-right:1em\">\n", |
| " <td>\n", |
| " <a class=\"button\" target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/partition.py\"><img src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n", |
| " </td>\n", |
| "</table>\n", |
| "\n", |
| "<br/><br/><br/>" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "example-2-partition-with-a-lambda-function" |
| }, |
| "source": [ |
| "### Example 2: Partition with a lambda function\n", |
| "\n", |
| "We can also use lambda functions to simplify **Example 1**." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "example-2-partition-with-a-lambda-function-code" |
| }, |
| "outputs": [], |
| "source": [ |
| "import apache_beam as beam\n", |
| "\n", |
| "durations = ['annual', 'biennial', 'perennial']\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " annuals, biennials, perennials = (\n", |
| " pipeline\n", |
| " | 'Gardening plants' >> beam.Create([\n", |
| " {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},\n", |
| " {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},\n", |
| " {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},\n", |
| " {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},\n", |
| " {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},\n", |
| " ])\n", |
| " | 'Partition' >> beam.Partition(\n", |
| " lambda plant, num_partitions: durations.index(plant['duration']),\n", |
| " len(durations),\n", |
| " )\n", |
| " )\n", |
| " _ = (\n", |
| " annuals\n", |
| " | 'Annuals' >> beam.Map(lambda x: print('annual: ' + str(x)))\n", |
| " )\n", |
| " _ = (\n", |
| " biennials\n", |
| " | 'Biennials' >> beam.Map(lambda x: print('biennial: ' + str(x)))\n", |
| " )\n", |
| " _ = (\n", |
| " perennials\n", |
| " | 'Perennials' >> beam.Map(lambda x: print('perennial: ' + str(x)))\n", |
| " )" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "example-2-partition-with-a-lambda-function-2" |
| }, |
| "source": [ |
| "<table align=\"left\" style=\"margin-right:1em\">\n", |
| " <td>\n", |
| " <a class=\"button\" target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/partition.py\"><img src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n", |
| " </td>\n", |
| "</table>\n", |
| "\n", |
| "<br/><br/><br/>" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "example-3-partition-with-multiple-arguments" |
| }, |
| "source": [ |
| "### Example 3: Partition with multiple arguments\n", |
| "\n", |
| "You can pass functions with multiple arguments to `Partition`.\n", |
| "They are passed as additional positional arguments or keyword arguments to the function.\n", |
| "\n", |
| "In machine learning, it is a common task to split data into\n", |
| "[training and a testing datasets](https://en.wikipedia.org/wiki/Training,_validation,_and_test_sets).\n", |
| "Typically, 80% of the data is used for training a model and 20% is used for testing.\n", |
| "\n", |
| "In this example, we split a `PCollection` dataset into training and testing datasets.\n", |
| "We define `split_dataset`, which takes the `plant` element, `num_partitions`,\n", |
| "and an additional argument `ratio`.\n", |
| "The `ratio` is a list of numbers which represents the ratio of how many items will go into each partition.\n", |
| "`num_partitions` is used by `Partitions` as a positional argument,\n", |
| "while `plant` and `ratio` are passed to `split_dataset`.\n", |
| "\n", |
| "If we want an 80%/20% split, we can specify a ratio of `[8, 2]`, which means that for every 10 elements,\n", |
| "8 go into the first partition and 2 go into the second.\n", |
| "In order to determine which partition to send each element, we have different buckets.\n", |
| "For our case `[8, 2]` has **10** buckets,\n", |
| "where the first 8 buckets represent the first partition and the last 2 buckets represent the second partition.\n", |
| "\n", |
| "First, we check that the ratio list's length corresponds to the `num_partitions` we pass.\n", |
| "We then get a bucket index for each element, in the range from 0 to 9 (`num_buckets-1`).\n", |
| "We could do `hash(element) % len(ratio)`, but instead we sum all the ASCII characters of the\n", |
| "JSON representation to make it deterministic.\n", |
| "Finally, we loop through all the elements in the ratio and have a running total to\n", |
| "identify the partition index to which that bucket corresponds.\n", |
| "\n", |
| "This `split_dataset` function is generic enough to support any number of partitions by any ratio.\n", |
| "You might want to adapt the bucket assignment to use a more appropriate or randomized hash for your dataset." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "example-3-partition-with-multiple-arguments-code" |
| }, |
| "outputs": [], |
| "source": [ |
| "import apache_beam as beam\n", |
| "import json\n", |
| "\n", |
| "def split_dataset(plant, num_partitions, ratio):\n", |
| " assert num_partitions == len(ratio)\n", |
| " bucket = sum(map(ord, json.dumps(plant))) % sum(ratio)\n", |
| " total = 0\n", |
| " for i, part in enumerate(ratio):\n", |
| " total += part\n", |
| " if bucket < total:\n", |
| " return i\n", |
| " return len(ratio) - 1\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " train_dataset, test_dataset = (\n", |
| " pipeline\n", |
| " | 'Gardening plants' >> beam.Create([\n", |
| " {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},\n", |
| " {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},\n", |
| " {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},\n", |
| " {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},\n", |
| " {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},\n", |
| " ])\n", |
| " | 'Partition' >> beam.Partition(split_dataset, 2, ratio=[8, 2])\n", |
| " )\n", |
| " _ = (\n", |
| " train_dataset\n", |
| " | 'Train' >> beam.Map(lambda x: print('train: ' + str(x)))\n", |
| " )\n", |
| " _ = (\n", |
| " test_dataset\n", |
| " | 'Test' >> beam.Map(lambda x: print('test: ' + str(x)))\n", |
| " )" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "example-3-partition-with-multiple-arguments-2" |
| }, |
| "source": [ |
| "<table align=\"left\" style=\"margin-right:1em\">\n", |
| " <td>\n", |
| " <a class=\"button\" target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/partition.py\"><img src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\" width=\"32px\" height=\"32px\" alt=\"View source code\"/> View source code</a>\n", |
| " </td>\n", |
| "</table>\n", |
| "\n", |
| "<br/><br/><br/>" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "related-transforms" |
| }, |
| "source": [ |
| "## Related transforms\n", |
| "\n", |
| "* [Filter](https://beam.apache.org/documentation/transforms/python/elementwise/filter) is useful if the function is just\n", |
| " deciding whether to output an element or not.\n", |
| "* [ParDo](https://beam.apache.org/documentation/transforms/python/elementwise/pardo) is the most general elementwise mapping\n", |
| " operation, and includes other abilities such as multiple output collections and side-inputs.\n", |
| "* [CoGroupByKey](https://beam.apache.org/documentation/transforms/python/aggregation/cogroupbykey)\n", |
| "performs a per-key equijoin.\n", |
| "\n", |
| "<table align=\"left\" style=\"margin-right:1em\">\n", |
| " <td>\n", |
| " <a class=\"button\" target=\"_blank\" href=\"https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.Partition\"><img src=\"https://beam.apache.org/images/logos/sdks/python.png\" width=\"32px\" height=\"32px\" alt=\"Pydoc\"/> Pydoc</a>\n", |
| " </td>\n", |
| "</table>\n", |
| "\n", |
| "<br/><br/><br/>" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "view-the-docs-bottom" |
| }, |
| "source": [ |
| "<table align=\"left\"><td><a target=\"_blank\" href=\"https://beam.apache.org/documentation/transforms/python/elementwise/partition\"><img src=\"https://beam.apache.org/images/logos/full-color/name-bottom/beam-logo-full-color-name-bottom-100.png\" width=\"32\" height=\"32\" />View the docs</a></td></table>" |
| ] |
| } |
| ], |
| "metadata": { |
| "colab": { |
| "name": "Partition - element-wise transform", |
| "toc_visible": true |
| }, |
| "kernelspec": { |
| "display_name": "python3", |
| "name": "python3" |
| } |
| }, |
| "nbformat": 4, |
| "nbformat_minor": 2 |
| } |