| { |
| "nbformat": 4, |
| "nbformat_minor": 2, |
| "metadata": { |
| "colab": { |
| "name": "Beam DataFrames", |
| "provenance": [], |
| "collapsed_sections": [], |
| "toc_visible": true |
| }, |
| "kernelspec": { |
| "name": "python3", |
| "display_name": "Python 3" |
| }, |
| "language_info": { |
| "name": "python" |
| } |
| }, |
| "cells": [ |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "source": [ |
| "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", |
| "\n", |
| "# Licensed to the Apache Software Foundation (ASF) under one\n", |
| "# or more contributor license agreements. See the NOTICE file\n", |
| "# distributed with this work for additional information\n", |
| "# regarding copyright ownership. The ASF licenses this file\n", |
| "# to you under the Apache License, Version 2.0 (the\n", |
| "# \"License\"); you may not use this file except in compliance\n", |
| "# with the License. You may obtain a copy of the License at\n", |
| "#\n", |
| "# http://www.apache.org/licenses/LICENSE-2.0\n", |
| "#\n", |
| "# Unless required by applicable law or agreed to in writing,\n", |
| "# software distributed under the License is distributed on an\n", |
| "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", |
| "# KIND, either express or implied. See the License for the\n", |
| "# specific language governing permissions and limitations\n", |
| "# under the License." |
| ], |
| "outputs": [], |
| "metadata": { |
| "cellView": "form", |
| "id": "rz2qIC9IL2rI" |
| } |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "# Beam DataFrames\n", |
| "\n", |
| "<button>\n", |
| " <a href=\"https://beam.apache.org/documentation/dsls/dataframes/overview/\">\n", |
| " <img src=\"https://beam.apache.org/images/favicon.ico\" alt=\"Open the docs\" height=\"16\"/>\n", |
| " Beam DataFrames overview\n", |
| " </a>\n", |
| "</button>\n", |
| "\n", |
| "Beam DataFrames provide a pandas-like [DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)\n", |
| "API to declare Beam pipelines.\n", |
| "\n", |
| "> ℹ️ To learn more about Beam DataFrames, take a look at the\n", |
| "[Beam DataFrames overview](https://beam.apache.org/documentation/dsls/dataframes/overview) page.\n", |
| "\n", |
| "First, we need to install Apache Beam with the `interactive` extra for the Interactive runner.", |
| "We also need `pandas` for this notebook, but the Interactive runner already depends on it." |
| ], |
| "metadata": { |
| "id": "hDuXLLSZnI1D" |
| } |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "source": [ |
| "%pip install --quiet apache-beam[interactive]" |
| ], |
| "outputs": [], |
| "metadata": { |
| "id": "8QVByaWjkarZ" |
| } |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "Lets create a small data file of\n", |
| "[Comma-Separated Values (CSV)](https://en.wikipedia.org/wiki/Comma-separated_values).\n", |
| "It simply includes the dates of the\n", |
| "[equinoxes](https://en.wikipedia.org/wiki/Equinox) and\n", |
| "[solstices](https://en.wikipedia.org/wiki/Solstice)\n", |
| "of the year 2021." |
| ], |
| "metadata": { |
| "id": "aLqdbX4Mgipq" |
| } |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "source": [ |
| "%%writefile solar_events.csv\n", |
| "timestamp,event\n", |
| "2021-03-20 09:37:00,March Equinox\n", |
| "2021-06-21 03:32:00,June Solstice\n", |
| "2021-09-22 19:21:00,September Equinox\n", |
| "2021-12-21 15:59:00,December Solstice" |
| ], |
| "outputs": [], |
| "metadata": { |
| "id": "hZjwAm7qotrJ" |
| } |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "# Interactive Beam\n", |
| "\n", |
| "Pandas has the\n", |
| "[`pandas.read_csv`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html)\n", |
| "function to easily read CSV files into DataFrames.\n", |
| "Beam has the\n", |
| "[`beam.dataframe.io.read_csv`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.io.html#apache_beam.dataframe.io.read_csv)\n", |
| "function that emulates `pandas.read_csv`, but returns a deferred Beam DataFrame.\n", |
| "\n", |
| "If you’re using\n", |
| "[Interactive Beam](https://beam.apache.org/releases/pydoc/current/apache_beam.runners.interactive.interactive_beam.html),\n", |
| "you can use `collect` to bring a Beam DataFrame into local memory as a Pandas DataFrame." |
| ], |
| "metadata": { |
| "id": "Hv_58JulleQ_" |
| } |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 3, |
| "source": [ |
| "import apache_beam as beam\n", |
| "import apache_beam.runners.interactive.interactive_beam as ib\n", |
| "from apache_beam.runners.interactive.interactive_runner import InteractiveRunner\n", |
| "\n", |
| "pipeline = beam.Pipeline(InteractiveRunner())\n", |
| "\n", |
| "# Create a deferred Beam DataFrame with the contents of our csv file.\n", |
| "beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('solar_events.csv')\n", |
| "\n", |
| "# We can use `ib.collect` to view the contents of a Beam DataFrame.\n", |
| "ib.collect(beam_df)" |
| ], |
| "outputs": [ |
| { |
| "output_type": "display_data", |
| "data": { |
| "application/javascript": "\n if (typeof window.interactive_beam_jquery == 'undefined') {\n var jqueryScript = document.createElement('script');\n jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n jqueryScript.type = 'text/javascript';\n jqueryScript.onload = function() {\n var datatableScript = document.createElement('script');\n datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n datatableScript.type = 'text/javascript';\n datatableScript.onload = function() {\n window.interactive_beam_jquery = jQuery.noConflict(true);\n window.interactive_beam_jquery(document).ready(function($){\n \n });\n }\n document.head.appendChild(datatableScript);\n };\n document.head.appendChild(jqueryScript);\n } else {\n window.interactive_beam_jquery(document).ready(function($){\n \n });\n }" |
| }, |
| "metadata": {} |
| }, |
| { |
| "output_type": "display_data", |
| "data": { |
| "text/html": [ |
| "\n", |
| " <link rel=\"stylesheet\" href=\"https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css\" integrity=\"sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh\" crossorigin=\"anonymous\">\n", |
| " <div id=\"progress_indicator_1516f4062e4fc6d4e58f33cf44c41c1d\" class=\"spinner-border text-info\" role=\"status\">\n", |
| " </div>" |
| ], |
| "text/plain": [ |
| "<IPython.core.display.HTML object>" |
| ] |
| }, |
| "metadata": {} |
| }, |
| { |
| "output_type": "stream", |
| "name": "stderr", |
| "text": [ |
| "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.\n" |
| ] |
| }, |
| { |
| "output_type": "display_data", |
| "data": { |
| "application/javascript": "\n if (typeof window.interactive_beam_jquery == 'undefined') {\n var jqueryScript = document.createElement('script');\n jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n jqueryScript.type = 'text/javascript';\n jqueryScript.onload = function() {\n var datatableScript = document.createElement('script');\n datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n datatableScript.type = 'text/javascript';\n datatableScript.onload = function() {\n window.interactive_beam_jquery = jQuery.noConflict(true);\n window.interactive_beam_jquery(document).ready(function($){\n \n $(\"#progress_indicator_1516f4062e4fc6d4e58f33cf44c41c1d\").remove();\n });\n }\n document.head.appendChild(datatableScript);\n };\n document.head.appendChild(jqueryScript);\n } else {\n window.interactive_beam_jquery(document).ready(function($){\n \n $(\"#progress_indicator_1516f4062e4fc6d4e58f33cf44c41c1d\").remove();\n });\n }" |
| }, |
| "metadata": {} |
| }, |
| { |
| "output_type": "execute_result", |
| "data": { |
| "text/html": [ |
| "<div>\n", |
| "<style scoped>\n", |
| " .dataframe tbody tr th:only-of-type {\n", |
| " vertical-align: middle;\n", |
| " }\n", |
| "\n", |
| " .dataframe tbody tr th {\n", |
| " vertical-align: top;\n", |
| " }\n", |
| "\n", |
| " .dataframe thead th {\n", |
| " text-align: right;\n", |
| " }\n", |
| "</style>\n", |
| "<table border=\"1\" class=\"dataframe\">\n", |
| " <thead>\n", |
| " <tr style=\"text-align: right;\">\n", |
| " <th></th>\n", |
| " <th>timestamp</th>\n", |
| " <th>event</th>\n", |
| " </tr>\n", |
| " </thead>\n", |
| " <tbody>\n", |
| " <tr>\n", |
| " <th>solar_events.csv:0</th>\n", |
| " <td>2021-03-20 09:37:00</td>\n", |
| " <td>March Equinox</td>\n", |
| " </tr>\n", |
| " <tr>\n", |
| " <th>solar_events.csv:1</th>\n", |
| " <td>2021-06-21 03:32:00</td>\n", |
| " <td>June Solstice</td>\n", |
| " </tr>\n", |
| " <tr>\n", |
| " <th>solar_events.csv:2</th>\n", |
| " <td>2021-09-22 19:21:00</td>\n", |
| " <td>September Equinox</td>\n", |
| " </tr>\n", |
| " <tr>\n", |
| " <th>solar_events.csv:3</th>\n", |
| " <td>2021-12-21 15:59:00</td>\n", |
| " <td>December Solstice</td>\n", |
| " </tr>\n", |
| " </tbody>\n", |
| "</table>\n", |
| "</div>" |
| ], |
| "text/plain": [ |
| " timestamp event\n", |
| "solar_events.csv:0 2021-03-20 09:37:00 March Equinox\n", |
| "solar_events.csv:1 2021-06-21 03:32:00 June Solstice\n", |
| "solar_events.csv:2 2021-09-22 19:21:00 September Equinox\n", |
| "solar_events.csv:3 2021-12-21 15:59:00 December Solstice" |
| ] |
| }, |
| "metadata": {}, |
| "execution_count": 3 |
| } |
| ], |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/", |
| "height": 242 |
| }, |
| "id": "sKAMXD5ElhYP", |
| "outputId": "928d9ad7-ae75-42d7-8dc6-8c5afd730b11" |
| } |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "Collecting a Beam DataFrame into a Pandas DataFrame is useful to perform\n", |
| "[operations not supported by Beam DataFrames](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas#classes-of-unsupported-operations).\n", |
| "\n", |
| "For example, let's say we want to take only the first two events in chronological order.\n", |
| "Since a deferred Beam DataFrame does not have any ordering guarantees,\n", |
| "first we need to sort the values.\n", |
| "In Pandas, we could first\n", |
| "[`df.sort_values(by='timestamp')`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.sort_values.html) and then\n", |
| "[`df.head(2)`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.head.html) to achieve this.\n", |
| "\n", |
| "However, these are\n", |
| "[order-sensitive operations](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas#order-sensitive-operations)\n", |
| "so using them in a Beam DataFrame raises a\n", |
| "[`WontImplementError`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.frame_base.html#apache_beam.dataframe.frame_base.WontImplementError).\n", |
| "We can work around this by using `collect` to convert the Beam DataFrame into a Pandas DataFrame." |
| ], |
| "metadata": { |
| "id": "t3Is6dArtN_Z" |
| } |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 4, |
| "source": [ |
| "import apache_beam.runners.interactive.interactive_beam as ib\n", |
| "\n", |
| "# Collect the Beam DataFrame into a Pandas DataFrame.\n", |
| "df = ib.collect(beam_df)\n", |
| "\n", |
| "# We can now use any Pandas transforms with our data.\n", |
| "df.sort_values(by='timestamp').head(2)" |
| ], |
| "outputs": [ |
| { |
| "output_type": "display_data", |
| "data": { |
| "text/html": [ |
| "\n", |
| " <link rel=\"stylesheet\" href=\"https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css\" integrity=\"sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh\" crossorigin=\"anonymous\">\n", |
| " <div id=\"progress_indicator_4486e01c01f75e7a68a4a5fefa9ecd2c\" class=\"spinner-border text-info\" role=\"status\">\n", |
| " </div>" |
| ], |
| "text/plain": [ |
| "<IPython.core.display.HTML object>" |
| ] |
| }, |
| "metadata": {} |
| }, |
| { |
| "output_type": "display_data", |
| "data": { |
| "application/javascript": "\n if (typeof window.interactive_beam_jquery == 'undefined') {\n var jqueryScript = document.createElement('script');\n jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n jqueryScript.type = 'text/javascript';\n jqueryScript.onload = function() {\n var datatableScript = document.createElement('script');\n datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n datatableScript.type = 'text/javascript';\n datatableScript.onload = function() {\n window.interactive_beam_jquery = jQuery.noConflict(true);\n window.interactive_beam_jquery(document).ready(function($){\n \n $(\"#progress_indicator_4486e01c01f75e7a68a4a5fefa9ecd2c\").remove();\n });\n }\n document.head.appendChild(datatableScript);\n };\n document.head.appendChild(jqueryScript);\n } else {\n window.interactive_beam_jquery(document).ready(function($){\n \n $(\"#progress_indicator_4486e01c01f75e7a68a4a5fefa9ecd2c\").remove();\n });\n }" |
| }, |
| "metadata": {} |
| }, |
| { |
| "output_type": "execute_result", |
| "data": { |
| "text/html": [ |
| "<div>\n", |
| "<style scoped>\n", |
| " .dataframe tbody tr th:only-of-type {\n", |
| " vertical-align: middle;\n", |
| " }\n", |
| "\n", |
| " .dataframe tbody tr th {\n", |
| " vertical-align: top;\n", |
| " }\n", |
| "\n", |
| " .dataframe thead th {\n", |
| " text-align: right;\n", |
| " }\n", |
| "</style>\n", |
| "<table border=\"1\" class=\"dataframe\">\n", |
| " <thead>\n", |
| " <tr style=\"text-align: right;\">\n", |
| " <th></th>\n", |
| " <th>timestamp</th>\n", |
| " <th>event</th>\n", |
| " </tr>\n", |
| " </thead>\n", |
| " <tbody>\n", |
| " <tr>\n", |
| " <th>solar_events.csv:0</th>\n", |
| " <td>2021-03-20 09:37:00</td>\n", |
| " <td>March Equinox</td>\n", |
| " </tr>\n", |
| " <tr>\n", |
| " <th>solar_events.csv:1</th>\n", |
| " <td>2021-06-21 03:32:00</td>\n", |
| " <td>June Solstice</td>\n", |
| " </tr>\n", |
| " </tbody>\n", |
| "</table>\n", |
| "</div>" |
| ], |
| "text/plain": [ |
| " timestamp event\n", |
| "solar_events.csv:0 2021-03-20 09:37:00 March Equinox\n", |
| "solar_events.csv:1 2021-06-21 03:32:00 June Solstice" |
| ] |
| }, |
| "metadata": {}, |
| "execution_count": 4 |
| } |
| ], |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/", |
| "height": 138 |
| }, |
| "id": "8haEu6_9iTi7", |
| "outputId": "a1e07bdc-c66d-45e5-efff-90b93219c648" |
| } |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "> ℹ️ Note that `collect` is _only_ accessible if you’re using\n", |
| "[Interactive Beam](https://beam.apache.org/releases/pydoc/current/apache_beam.runners.interactive.interactive_beam.html)" |
| ], |
| "metadata": { |
| "id": "ZkthQ13pwpm0" |
| } |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "# Beam DataFrames to PCollections\n", |
| "\n", |
| "If you have your data as a Beam DataFrame, you can convert it into a regular PCollection with\n", |
| "[`to_pcollection`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_pcollection).\n", |
| "\n", |
| "Converting a Beam DataFrame in this way yields a PCollection with a [schema](https://beam.apache.org/documentation/programming-guide/#what-is-a-schema).\n", |
| "This allows us to easily access each property by attribute, for example `element.event` and `element.timestamp`.\n", |
| "\n", |
| "Sometimes it's more convenient to convert the named tuples to Python dictionaries.\n", |
| "We can do that with the\n", |
| "[`_asdict`](https://docs.python.org/3/library/collections.html#collections.somenamedtuple._asdict)\n", |
| "method." |
| ], |
| "metadata": { |
| "id": "ujRm4K0iP8SX" |
| } |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 5, |
| "source": [ |
| "import apache_beam as beam\n", |
| "from apache_beam.dataframe import convert\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('solar_events.csv')\n", |
| "\n", |
| " (\n", |
| " # Convert the Beam DataFrame to a PCollection.\n", |
| " convert.to_pcollection(beam_df)\n", |
| "\n", |
| " # We get named tuples, we can convert them to dictionaries like this.\n", |
| " | 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))\n", |
| "\n", |
| " # Print the elements in the PCollection.\n", |
| " | 'Print' >> beam.Map(print)\n", |
| " )" |
| ], |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stderr", |
| "text": [ |
| "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.\n" |
| ] |
| }, |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n", |
| "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n", |
| "{'timestamp': '2021-09-22 19:21:00', 'event': 'September Equinox'}\n", |
| "{'timestamp': '2021-12-21 15:59:00', 'event': 'December Solstice'}\n" |
| ] |
| } |
| ], |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "g22op8rZPvB3", |
| "outputId": "bba88b0b-4d19-4d61-dac7-2c168998a2e4" |
| } |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "# Pandas DataFrames to PCollections\n", |
| "\n", |
| "If you have your data as a Pandas DataFrame, you can convert it into a regular PCollection with\n", |
| "[`to_pcollection`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_pcollection).\n", |
| "\n", |
| "Since Pandas DataFrames are not part of any Beam pipeline, we must provide the `pipeline` explicitly." |
| ], |
| "metadata": { |
| "id": "t6xNIO0iPwtn" |
| } |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 6, |
| "source": [ |
| "import pandas as pd\n", |
| "import apache_beam as beam\n", |
| "from apache_beam.dataframe import convert\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " df = pd.read_csv('solar_events.csv')\n", |
| "\n", |
| " (\n", |
| " # Convert the Pandas DataFrame to a PCollection.\n", |
| " convert.to_pcollection(df, pipeline=pipeline)\n", |
| "\n", |
| " # We get named tuples, we can convert them to dictionaries like this.\n", |
| " | 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))\n", |
| "\n", |
| " # Print the elements in the PCollection.\n", |
| " | 'Print' >> beam.Map(print)\n", |
| " )" |
| ], |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stderr", |
| "text": [ |
| "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.\n" |
| ] |
| }, |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n", |
| "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n", |
| "{'timestamp': '2021-09-22 19:21:00', 'event': 'September Equinox'}\n", |
| "{'timestamp': '2021-12-21 15:59:00', 'event': 'December Solstice'}\n" |
| ] |
| } |
| ], |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "YWYVFkvFuksz", |
| "outputId": "a3e3e6fa-85ce-4891-95a0-389fba4461a6" |
| } |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "If you have your data as a PCollection of Pandas DataFrames, you can convert them into a PCollection with\n", |
| "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap).\n", |
| "\n", |
| "> ℹ️ If the number of elements in each DataFrame can be very different (that is, some DataFrames might contain thousands of elements while others contain only a handful of elements), it might be a good idea to\n", |
| "> [`Reshuffle`](https://beam.apache.org/documentation/transforms/python/other/reshuffle).\n", |
| "> This basically rebalances the elements in the PCollection, which helps make sure all the workers have a balanced number of elements." |
| ], |
| "metadata": { |
| "id": "z6Q_tyWszkMC" |
| } |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 7, |
| "source": [ |
| "import pandas as pd\n", |
| "import apache_beam as beam\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " (\n", |
| " pipeline\n", |
| " | 'Filename' >> beam.Create(['solar_events.csv'])\n", |
| "\n", |
| " # Each element is a Pandas DataFrame, so we can do any Pandas operation.\n", |
| " | 'Read CSV' >> beam.Map(pd.read_csv)\n", |
| "\n", |
| " # We yield each element of all the DataFrames into a PCollection of dictionaries.\n", |
| " | 'To dictionaries' >> beam.FlatMap(lambda df: df.to_dict('records'))\n", |
| "\n", |
| " # Reshuffle to make sure parallelization is balanced.\n", |
| " | 'Reshuffle' >> beam.Reshuffle()\n", |
| "\n", |
| " # Print the elements in the PCollection.\n", |
| " | 'Print' >> beam.Map(print)\n", |
| " )" |
| ], |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stderr", |
| "text": [ |
| "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.\n" |
| ] |
| }, |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n", |
| "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n", |
| "{'timestamp': '2021-09-22 19:21:00', 'event': 'September Equinox'}\n", |
| "{'timestamp': '2021-12-21 15:59:00', 'event': 'December Solstice'}\n" |
| ] |
| } |
| ], |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "fVWjO2Zfziqu", |
| "outputId": "c5db7be4-f764-487a-bc3b-bd5cbad4e396" |
| } |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "# PCollections to Beam DataFrames\n", |
| "\n", |
| "If you have your data as a PCollection, you can convert it into a deferred Beam DataFrame with\n", |
| "[`to_dataframe`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_dataframe).\n", |
| "\n", |
| "> ℹ️ To convert a PCollection to a Beam DataFrame, each element _must_ have a\n", |
| "[schema](https://beam.apache.org/documentation/programming-guide/#what-is-a-schema)." |
| ], |
| "metadata": { |
| "id": "_Dm2u71EIRFr" |
| } |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 8, |
| "source": [ |
| "import csv\n", |
| "import apache_beam as beam\n", |
| "from apache_beam.dataframe import convert\n", |
| "\n", |
| "with open('solar_events.csv') as f:\n", |
| " solar_events = [dict(row) for row in csv.DictReader(f)]\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " pcoll = pipeline | 'Create data' >> beam.Create(solar_events)\n", |
| "\n", |
| " # Convert the PCollection into a Beam DataFrame\n", |
| " beam_df = convert.to_dataframe(pcoll | 'To Rows' >> beam.Map(\n", |
| " lambda x: beam.Row(\n", |
| " timestamp=x['timestamp'],\n", |
| " event=x['event'],\n", |
| " )\n", |
| " ))\n", |
| "\n", |
| " # Print the elements in the Beam DataFrame.\n", |
| " (\n", |
| " convert.to_pcollection(beam_df)\n", |
| " | 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))\n", |
| " | 'Print' >> beam.Map(print)\n", |
| " )" |
| ], |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stderr", |
| "text": [ |
| "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.\n" |
| ] |
| }, |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n", |
| "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n", |
| "{'timestamp': '2021-09-22 19:21:00', 'event': 'September Equinox'}\n", |
| "{'timestamp': '2021-12-21 15:59:00', 'event': 'December Solstice'}\n" |
| ] |
| } |
| ], |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "N6dVNkkEIWa_", |
| "outputId": "16556170-fbf6-4980-962c-bb466d0b76b2" |
| } |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "# PCollections to Pandas DataFrames\n", |
| "\n", |
| "If you have your data as a PCollection, you can convert it into an in-memory Pandas DataFrame via a\n", |
| "[side input](https://beam.apache.org/documentation/programming-guide#side-inputs).\n", |
| "\n", |
| "> ℹ️ It's recommended to **only** do this if you need to use a Pandas operation that is\n", |
| "> [not supported in Beam DataFrames](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas/#classes-of-unsupported-operations).\n", |
| "> Converting a PCollection into a Pandas DataFrame consolidates elements from potentially multiple workers into a single worker, which could create a performance bottleneck.\n" |
| "\n", |
| "> ⚠️ Pandas DataFrames are in-memory data structures, so make sure all the elements in the PCollection fit into memory.\n", |
| "> If they don't fit into memory, consider yielding multiple DataFrame elements via\n", |
| "> [`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap)." |
| ], |
| "metadata": { |
| "id": "kj08jOZQQa_q" |
| } |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 9, |
| "source": [ |
| "import csv\n", |
| "import pandas as pd\n", |
| "import apache_beam as beam\n", |
| "\n", |
| "with open('solar_events.csv') as f:\n", |
| " solar_events = [dict(row) for row in csv.DictReader(f)]\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " pcoll = pipeline | 'Create data' >> beam.Create(solar_events)\n", |
| "\n", |
| " (\n", |
| " pipeline\n", |
| "\n", |
| " # Create a single element containing the entire PCollection. \n", |
| " | 'Singleton' >> beam.Create([None])\n", |
| " | 'As Pandas' >> beam.Map(\n", |
| " lambda _, dict_iter: pd.DataFrame(dict_iter),\n", |
| " dict_iter=beam.pvalue.AsIter(pcoll),\n", |
| " )\n", |
| "\n", |
| " # Print the Pandas DataFrame.\n", |
| " | 'Print' >> beam.Map(print)\n", |
| " )" |
| ], |
| "outputs": [ |
| { |
| "output_type": "stream", |
| "name": "stderr", |
| "text": [ |
| "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.\n" |
| ] |
| }, |
| { |
| "output_type": "stream", |
| "name": "stdout", |
| "text": [ |
| " timestamp event\n", |
| "0 2021-03-20 09:37:00 March Equinox\n", |
| "1 2021-06-21 03:32:00 June Solstice\n", |
| "2 2021-09-22 19:21:00 September Equinox\n", |
| "3 2021-12-21 15:59:00 December Solstice\n" |
| ] |
| } |
| ], |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "cHZdiPbOG-sy", |
| "outputId": "11c84948-fccf-41fd-c276-7c5803264ff7" |
| } |
| }, |
| { |
| "cell_type": "markdown", |
| "source": [ |
| "# What's next?\n", |
| "\n", |
| "* [Beam DataFrames overview](https://beam.apache.org/documentation/dsls/dataframes/overview) -- an overview of the Beam DataFrames API.\n", |
| "* [Differences from pandas](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas) -- goes through some of the differences between Beam DataFrames and Pandas DataFrames, as well as some of the workarounds for unsupported operations.\n", |
| "* [10 minutes to Pandas](https://pandas.pydata.org/pandas-docs/stable/user_guide/10min.html) -- a quickstart guide to Pandas DataFrames.\n", |
| "* [Pandas DataFrame API](https://pandas.pydata.org/pandas-docs/stable/reference/frame.html) -- the API reference for Pandas DataFrames" |
| ], |
| "metadata": { |
| "id": "UflW6AJp6-ss" |
| } |
| } |
| ] |
| } |