blob: c19d991ba6120feaf2affd5ec228d74e45f7be52 [file] [log] [blame]
{
"nbformat": 4,
"nbformat_minor": 2,
"metadata": {
"colab": {
"name": "Beam DataFrames",
"provenance": [],
"collapsed_sections": [],
"toc_visible": true
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
},
"language_info": {
"name": "python"
}
},
"cells": [
{
"cell_type": "code",
"execution_count": null,
"source": [
"#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
"\n",
"# Licensed to the Apache Software Foundation (ASF) under one\n",
"# or more contributor license agreements. See the NOTICE file\n",
"# distributed with this work for additional information\n",
"# regarding copyright ownership. The ASF licenses this file\n",
"# to you under the Apache License, Version 2.0 (the\n",
"# \"License\"); you may not use this file except in compliance\n",
"# with the License. You may obtain a copy of the License at\n",
"#\n",
"# http://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing,\n",
"# software distributed under the License is distributed on an\n",
"# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
"# KIND, either express or implied. See the License for the\n",
"# specific language governing permissions and limitations\n",
"# under the License."
],
"outputs": [],
"metadata": {
"cellView": "form",
"id": "rz2qIC9IL2rI"
}
},
{
"cell_type": "markdown",
"source": [
"# Beam DataFrames\n",
"\n",
"<button>\n",
" <a href=\"https://beam.apache.org/documentation/dsls/dataframes/overview/\">\n",
" <img src=\"https://beam.apache.org/images/favicon.ico\" alt=\"Open the docs\" height=\"16\"/>\n",
" Beam DataFrames overview\n",
" </a>\n",
"</button>\n",
"\n",
"Beam DataFrames provide a pandas-like [DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)\n",
"API to declare Beam pipelines.\n",
"\n",
"> ℹ️ To learn more about Beam DataFrames, take a look at the\n",
"[Beam DataFrames overview](https://beam.apache.org/documentation/dsls/dataframes/overview) page.\n",
"\n",
"First, we need to install Apache Beam with the `interactive` extra for the Interactive runner.",
"We also need `pandas` for this notebook, but the Interactive runner already depends on it."
],
"metadata": {
"id": "hDuXLLSZnI1D"
}
},
{
"cell_type": "code",
"execution_count": null,
"source": [
"%pip install --quiet apache-beam[interactive]"
],
"outputs": [],
"metadata": {
"id": "8QVByaWjkarZ"
}
},
{
"cell_type": "markdown",
"source": [
"Lets create a small data file of\n",
"[Comma-Separated Values (CSV)](https://en.wikipedia.org/wiki/Comma-separated_values).\n",
"It simply includes the dates of the\n",
"[equinoxes](https://en.wikipedia.org/wiki/Equinox) and\n",
"[solstices](https://en.wikipedia.org/wiki/Solstice)\n",
"of the year 2021."
],
"metadata": {
"id": "aLqdbX4Mgipq"
}
},
{
"cell_type": "code",
"execution_count": null,
"source": [
"%%writefile solar_events.csv\n",
"timestamp,event\n",
"2021-03-20 09:37:00,March Equinox\n",
"2021-06-21 03:32:00,June Solstice\n",
"2021-09-22 19:21:00,September Equinox\n",
"2021-12-21 15:59:00,December Solstice"
],
"outputs": [],
"metadata": {
"id": "hZjwAm7qotrJ"
}
},
{
"cell_type": "markdown",
"source": [
"# Interactive Beam\n",
"\n",
"Pandas has the\n",
"[`pandas.read_csv`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html)\n",
"function to easily read CSV files into DataFrames.\n",
"Beam has the\n",
"[`beam.dataframe.io.read_csv`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.io.html#apache_beam.dataframe.io.read_csv)\n",
"function that emulates `pandas.read_csv`, but returns a deferred Beam DataFrame.\n",
"\n",
"If you’re using\n",
"[Interactive Beam](https://beam.apache.org/releases/pydoc/current/apache_beam.runners.interactive.interactive_beam.html),\n",
"you can use `collect` to bring a Beam DataFrame into local memory as a Pandas DataFrame."
],
"metadata": {
"id": "Hv_58JulleQ_"
}
},
{
"cell_type": "code",
"execution_count": 3,
"source": [
"import apache_beam as beam\n",
"import apache_beam.runners.interactive.interactive_beam as ib\n",
"from apache_beam.runners.interactive.interactive_runner import InteractiveRunner\n",
"\n",
"pipeline = beam.Pipeline(InteractiveRunner())\n",
"\n",
"# Create a deferred Beam DataFrame with the contents of our csv file.\n",
"beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('solar_events.csv')\n",
"\n",
"# We can use `ib.collect` to view the contents of a Beam DataFrame.\n",
"ib.collect(beam_df)"
],
"outputs": [
{
"output_type": "display_data",
"data": {
"application/javascript": "\n if (typeof window.interactive_beam_jquery == 'undefined') {\n var jqueryScript = document.createElement('script');\n jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n jqueryScript.type = 'text/javascript';\n jqueryScript.onload = function() {\n var datatableScript = document.createElement('script');\n datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n datatableScript.type = 'text/javascript';\n datatableScript.onload = function() {\n window.interactive_beam_jquery = jQuery.noConflict(true);\n window.interactive_beam_jquery(document).ready(function($){\n \n });\n }\n document.head.appendChild(datatableScript);\n };\n document.head.appendChild(jqueryScript);\n } else {\n window.interactive_beam_jquery(document).ready(function($){\n \n });\n }"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/html": [
"\n",
" <link rel=\"stylesheet\" href=\"https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css\" integrity=\"sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh\" crossorigin=\"anonymous\">\n",
" <div id=\"progress_indicator_1516f4062e4fc6d4e58f33cf44c41c1d\" class=\"spinner-border text-info\" role=\"status\">\n",
" </div>"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {}
},
{
"output_type": "stream",
"name": "stderr",
"text": [
"WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.\n"
]
},
{
"output_type": "display_data",
"data": {
"application/javascript": "\n if (typeof window.interactive_beam_jquery == 'undefined') {\n var jqueryScript = document.createElement('script');\n jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n jqueryScript.type = 'text/javascript';\n jqueryScript.onload = function() {\n var datatableScript = document.createElement('script');\n datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n datatableScript.type = 'text/javascript';\n datatableScript.onload = function() {\n window.interactive_beam_jquery = jQuery.noConflict(true);\n window.interactive_beam_jquery(document).ready(function($){\n \n $(\"#progress_indicator_1516f4062e4fc6d4e58f33cf44c41c1d\").remove();\n });\n }\n document.head.appendChild(datatableScript);\n };\n document.head.appendChild(jqueryScript);\n } else {\n window.interactive_beam_jquery(document).ready(function($){\n \n $(\"#progress_indicator_1516f4062e4fc6d4e58f33cf44c41c1d\").remove();\n });\n }"
},
"metadata": {}
},
{
"output_type": "execute_result",
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>timestamp</th>\n",
" <th>event</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>solar_events.csv:0</th>\n",
" <td>2021-03-20 09:37:00</td>\n",
" <td>March Equinox</td>\n",
" </tr>\n",
" <tr>\n",
" <th>solar_events.csv:1</th>\n",
" <td>2021-06-21 03:32:00</td>\n",
" <td>June Solstice</td>\n",
" </tr>\n",
" <tr>\n",
" <th>solar_events.csv:2</th>\n",
" <td>2021-09-22 19:21:00</td>\n",
" <td>September Equinox</td>\n",
" </tr>\n",
" <tr>\n",
" <th>solar_events.csv:3</th>\n",
" <td>2021-12-21 15:59:00</td>\n",
" <td>December Solstice</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" timestamp event\n",
"solar_events.csv:0 2021-03-20 09:37:00 March Equinox\n",
"solar_events.csv:1 2021-06-21 03:32:00 June Solstice\n",
"solar_events.csv:2 2021-09-22 19:21:00 September Equinox\n",
"solar_events.csv:3 2021-12-21 15:59:00 December Solstice"
]
},
"metadata": {},
"execution_count": 3
}
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 242
},
"id": "sKAMXD5ElhYP",
"outputId": "928d9ad7-ae75-42d7-8dc6-8c5afd730b11"
}
},
{
"cell_type": "markdown",
"source": [
"Collecting a Beam DataFrame into a Pandas DataFrame is useful to perform\n",
"[operations not supported by Beam DataFrames](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas#classes-of-unsupported-operations).\n",
"\n",
"For example, let's say we want to take only the first two events in chronological order.\n",
"Since a deferred Beam DataFrame does not have any ordering guarantees,\n",
"first we need to sort the values.\n",
"In Pandas, we could first\n",
"[`df.sort_values(by='timestamp')`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.sort_values.html) and then\n",
"[`df.head(2)`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.head.html) to achieve this.\n",
"\n",
"However, these are\n",
"[order-sensitive operations](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas#order-sensitive-operations)\n",
"so using them in a Beam DataFrame raises a\n",
"[`WontImplementError`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.frame_base.html#apache_beam.dataframe.frame_base.WontImplementError).\n",
"We can work around this by using `collect` to convert the Beam DataFrame into a Pandas DataFrame."
],
"metadata": {
"id": "t3Is6dArtN_Z"
}
},
{
"cell_type": "code",
"execution_count": 4,
"source": [
"import apache_beam.runners.interactive.interactive_beam as ib\n",
"\n",
"# Collect the Beam DataFrame into a Pandas DataFrame.\n",
"df = ib.collect(beam_df)\n",
"\n",
"# We can now use any Pandas transforms with our data.\n",
"df.sort_values(by='timestamp').head(2)"
],
"outputs": [
{
"output_type": "display_data",
"data": {
"text/html": [
"\n",
" <link rel=\"stylesheet\" href=\"https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css\" integrity=\"sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh\" crossorigin=\"anonymous\">\n",
" <div id=\"progress_indicator_4486e01c01f75e7a68a4a5fefa9ecd2c\" class=\"spinner-border text-info\" role=\"status\">\n",
" </div>"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"application/javascript": "\n if (typeof window.interactive_beam_jquery == 'undefined') {\n var jqueryScript = document.createElement('script');\n jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n jqueryScript.type = 'text/javascript';\n jqueryScript.onload = function() {\n var datatableScript = document.createElement('script');\n datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n datatableScript.type = 'text/javascript';\n datatableScript.onload = function() {\n window.interactive_beam_jquery = jQuery.noConflict(true);\n window.interactive_beam_jquery(document).ready(function($){\n \n $(\"#progress_indicator_4486e01c01f75e7a68a4a5fefa9ecd2c\").remove();\n });\n }\n document.head.appendChild(datatableScript);\n };\n document.head.appendChild(jqueryScript);\n } else {\n window.interactive_beam_jquery(document).ready(function($){\n \n $(\"#progress_indicator_4486e01c01f75e7a68a4a5fefa9ecd2c\").remove();\n });\n }"
},
"metadata": {}
},
{
"output_type": "execute_result",
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>timestamp</th>\n",
" <th>event</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>solar_events.csv:0</th>\n",
" <td>2021-03-20 09:37:00</td>\n",
" <td>March Equinox</td>\n",
" </tr>\n",
" <tr>\n",
" <th>solar_events.csv:1</th>\n",
" <td>2021-06-21 03:32:00</td>\n",
" <td>June Solstice</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" timestamp event\n",
"solar_events.csv:0 2021-03-20 09:37:00 March Equinox\n",
"solar_events.csv:1 2021-06-21 03:32:00 June Solstice"
]
},
"metadata": {},
"execution_count": 4
}
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 138
},
"id": "8haEu6_9iTi7",
"outputId": "a1e07bdc-c66d-45e5-efff-90b93219c648"
}
},
{
"cell_type": "markdown",
"source": [
"> ℹ️ Note that `collect` is _only_ accessible if you’re using\n",
"[Interactive Beam](https://beam.apache.org/releases/pydoc/current/apache_beam.runners.interactive.interactive_beam.html)"
],
"metadata": {
"id": "ZkthQ13pwpm0"
}
},
{
"cell_type": "markdown",
"source": [
"# Beam DataFrames to PCollections\n",
"\n",
"If you have your data as a Beam DataFrame, you can convert it into a regular PCollection with\n",
"[`to_pcollection`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_pcollection).\n",
"\n",
"Converting a Beam DataFrame in this way yields a PCollection with a [schema](https://beam.apache.org/documentation/programming-guide/#what-is-a-schema).\n",
"This allows us to easily access each property by attribute, for example `element.event` and `element.timestamp`.\n",
"\n",
"Sometimes it's more convenient to convert the named tuples to Python dictionaries.\n",
"We can do that with the\n",
"[`_asdict`](https://docs.python.org/3/library/collections.html#collections.somenamedtuple._asdict)\n",
"method."
],
"metadata": {
"id": "ujRm4K0iP8SX"
}
},
{
"cell_type": "code",
"execution_count": 5,
"source": [
"import apache_beam as beam\n",
"from apache_beam.dataframe import convert\n",
"\n",
"with beam.Pipeline() as pipeline:\n",
" beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('solar_events.csv')\n",
"\n",
" (\n",
" # Convert the Beam DataFrame to a PCollection.\n",
" convert.to_pcollection(beam_df)\n",
"\n",
" # We get named tuples, we can convert them to dictionaries like this.\n",
" | 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))\n",
"\n",
" # Print the elements in the PCollection.\n",
" | 'Print' >> beam.Map(print)\n",
" )"
],
"outputs": [
{
"output_type": "stream",
"name": "stderr",
"text": [
"WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.\n"
]
},
{
"output_type": "stream",
"name": "stdout",
"text": [
"{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n",
"{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n",
"{'timestamp': '2021-09-22 19:21:00', 'event': 'September Equinox'}\n",
"{'timestamp': '2021-12-21 15:59:00', 'event': 'December Solstice'}\n"
]
}
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "g22op8rZPvB3",
"outputId": "bba88b0b-4d19-4d61-dac7-2c168998a2e4"
}
},
{
"cell_type": "markdown",
"source": [
"# Pandas DataFrames to PCollections\n",
"\n",
"If you have your data as a Pandas DataFrame, you can convert it into a regular PCollection with\n",
"[`to_pcollection`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_pcollection).\n",
"\n",
"Since Pandas DataFrames are not part of any Beam pipeline, we must provide the `pipeline` explicitly."
],
"metadata": {
"id": "t6xNIO0iPwtn"
}
},
{
"cell_type": "code",
"execution_count": 6,
"source": [
"import pandas as pd\n",
"import apache_beam as beam\n",
"from apache_beam.dataframe import convert\n",
"\n",
"with beam.Pipeline() as pipeline:\n",
" df = pd.read_csv('solar_events.csv')\n",
"\n",
" (\n",
" # Convert the Pandas DataFrame to a PCollection.\n",
" convert.to_pcollection(df, pipeline=pipeline)\n",
"\n",
" # We get named tuples, we can convert them to dictionaries like this.\n",
" | 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))\n",
"\n",
" # Print the elements in the PCollection.\n",
" | 'Print' >> beam.Map(print)\n",
" )"
],
"outputs": [
{
"output_type": "stream",
"name": "stderr",
"text": [
"WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.\n"
]
},
{
"output_type": "stream",
"name": "stdout",
"text": [
"{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n",
"{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n",
"{'timestamp': '2021-09-22 19:21:00', 'event': 'September Equinox'}\n",
"{'timestamp': '2021-12-21 15:59:00', 'event': 'December Solstice'}\n"
]
}
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "YWYVFkvFuksz",
"outputId": "a3e3e6fa-85ce-4891-95a0-389fba4461a6"
}
},
{
"cell_type": "markdown",
"source": [
"If you have your data as a PCollection of Pandas DataFrames, you can convert them into a PCollection with\n",
"[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap).\n",
"\n",
"> ℹ️ If the number of elements in each DataFrame can be very different (that is, some DataFrames might contain thousands of elements while others contain only a handful of elements), it might be a good idea to\n",
"> [`Reshuffle`](https://beam.apache.org/documentation/transforms/python/other/reshuffle).\n",
"> This basically rebalances the elements in the PCollection, which helps make sure all the workers have a balanced number of elements."
],
"metadata": {
"id": "z6Q_tyWszkMC"
}
},
{
"cell_type": "code",
"execution_count": 7,
"source": [
"import pandas as pd\n",
"import apache_beam as beam\n",
"\n",
"with beam.Pipeline() as pipeline:\n",
" (\n",
" pipeline\n",
" | 'Filename' >> beam.Create(['solar_events.csv'])\n",
"\n",
" # Each element is a Pandas DataFrame, so we can do any Pandas operation.\n",
" | 'Read CSV' >> beam.Map(pd.read_csv)\n",
"\n",
" # We yield each element of all the DataFrames into a PCollection of dictionaries.\n",
" | 'To dictionaries' >> beam.FlatMap(lambda df: df.to_dict('records'))\n",
"\n",
" # Reshuffle to make sure parallelization is balanced.\n",
" | 'Reshuffle' >> beam.Reshuffle()\n",
"\n",
" # Print the elements in the PCollection.\n",
" | 'Print' >> beam.Map(print)\n",
" )"
],
"outputs": [
{
"output_type": "stream",
"name": "stderr",
"text": [
"WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.\n"
]
},
{
"output_type": "stream",
"name": "stdout",
"text": [
"{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n",
"{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n",
"{'timestamp': '2021-09-22 19:21:00', 'event': 'September Equinox'}\n",
"{'timestamp': '2021-12-21 15:59:00', 'event': 'December Solstice'}\n"
]
}
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "fVWjO2Zfziqu",
"outputId": "c5db7be4-f764-487a-bc3b-bd5cbad4e396"
}
},
{
"cell_type": "markdown",
"source": [
"# PCollections to Beam DataFrames\n",
"\n",
"If you have your data as a PCollection, you can convert it into a deferred Beam DataFrame with\n",
"[`to_dataframe`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_dataframe).\n",
"\n",
"> ℹ️ To convert a PCollection to a Beam DataFrame, each element _must_ have a\n",
"[schema](https://beam.apache.org/documentation/programming-guide/#what-is-a-schema)."
],
"metadata": {
"id": "_Dm2u71EIRFr"
}
},
{
"cell_type": "code",
"execution_count": 8,
"source": [
"import csv\n",
"import apache_beam as beam\n",
"from apache_beam.dataframe import convert\n",
"\n",
"with open('solar_events.csv') as f:\n",
" solar_events = [dict(row) for row in csv.DictReader(f)]\n",
"\n",
"with beam.Pipeline() as pipeline:\n",
" pcoll = pipeline | 'Create data' >> beam.Create(solar_events)\n",
"\n",
" # Convert the PCollection into a Beam DataFrame\n",
" beam_df = convert.to_dataframe(pcoll | 'To Rows' >> beam.Map(\n",
" lambda x: beam.Row(\n",
" timestamp=x['timestamp'],\n",
" event=x['event'],\n",
" )\n",
" ))\n",
"\n",
" # Print the elements in the Beam DataFrame.\n",
" (\n",
" convert.to_pcollection(beam_df)\n",
" | 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))\n",
" | 'Print' >> beam.Map(print)\n",
" )"
],
"outputs": [
{
"output_type": "stream",
"name": "stderr",
"text": [
"WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.\n"
]
},
{
"output_type": "stream",
"name": "stdout",
"text": [
"{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n",
"{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n",
"{'timestamp': '2021-09-22 19:21:00', 'event': 'September Equinox'}\n",
"{'timestamp': '2021-12-21 15:59:00', 'event': 'December Solstice'}\n"
]
}
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "N6dVNkkEIWa_",
"outputId": "16556170-fbf6-4980-962c-bb466d0b76b2"
}
},
{
"cell_type": "markdown",
"source": [
"# PCollections to Pandas DataFrames\n",
"\n",
"If you have your data as a PCollection, you can convert it into an in-memory Pandas DataFrame via a\n",
"[side input](https://beam.apache.org/documentation/programming-guide#side-inputs).\n",
"\n",
"> ℹ️ It's recommended to **only** do this if you need to use a Pandas operation that is\n",
"> [not supported in Beam DataFrames](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas/#classes-of-unsupported-operations).\n",
"> Converting a PCollection into a Pandas DataFrame consolidates elements from potentially multiple workers into a single worker, which could create a performance bottleneck.\n"
"\n",
"> ⚠️ Pandas DataFrames are in-memory data structures, so make sure all the elements in the PCollection fit into memory.\n",
"> If they don't fit into memory, consider yielding multiple DataFrame elements via\n",
"> [`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap)."
],
"metadata": {
"id": "kj08jOZQQa_q"
}
},
{
"cell_type": "code",
"execution_count": 9,
"source": [
"import csv\n",
"import pandas as pd\n",
"import apache_beam as beam\n",
"\n",
"with open('solar_events.csv') as f:\n",
" solar_events = [dict(row) for row in csv.DictReader(f)]\n",
"\n",
"with beam.Pipeline() as pipeline:\n",
" pcoll = pipeline | 'Create data' >> beam.Create(solar_events)\n",
"\n",
" (\n",
" pipeline\n",
"\n",
" # Create a single element containing the entire PCollection. \n",
" | 'Singleton' >> beam.Create([None])\n",
" | 'As Pandas' >> beam.Map(\n",
" lambda _, dict_iter: pd.DataFrame(dict_iter),\n",
" dict_iter=beam.pvalue.AsIter(pcoll),\n",
" )\n",
"\n",
" # Print the Pandas DataFrame.\n",
" | 'Print' >> beam.Map(print)\n",
" )"
],
"outputs": [
{
"output_type": "stream",
"name": "stderr",
"text": [
"WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.\n"
]
},
{
"output_type": "stream",
"name": "stdout",
"text": [
" timestamp event\n",
"0 2021-03-20 09:37:00 March Equinox\n",
"1 2021-06-21 03:32:00 June Solstice\n",
"2 2021-09-22 19:21:00 September Equinox\n",
"3 2021-12-21 15:59:00 December Solstice\n"
]
}
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "cHZdiPbOG-sy",
"outputId": "11c84948-fccf-41fd-c276-7c5803264ff7"
}
},
{
"cell_type": "markdown",
"source": [
"# What's next?\n",
"\n",
"* [Beam DataFrames overview](https://beam.apache.org/documentation/dsls/dataframes/overview) -- an overview of the Beam DataFrames API.\n",
"* [Differences from pandas](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas) -- goes through some of the differences between Beam DataFrames and Pandas DataFrames, as well as some of the workarounds for unsupported operations.\n",
"* [10 minutes to Pandas](https://pandas.pydata.org/pandas-docs/stable/user_guide/10min.html) -- a quickstart guide to Pandas DataFrames.\n",
"* [Pandas DataFrame API](https://pandas.pydata.org/pandas-docs/stable/reference/frame.html) -- the API reference for Pandas DataFrames"
],
"metadata": {
"id": "UflW6AJp6-ss"
}
}
]
}