| { |
| "cells": [ |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "colab_type": "text", |
| "id": "view-in-github" |
| }, |
| "source": [ |
| "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/learn_beam_windowing_by_doing.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "cellView": "form", |
| "id": "L7ZbRufePd2g" |
| }, |
| "outputs": [], |
| "source": [ |
| "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", |
| "\n", |
| "# Licensed to the Apache Software Foundation (ASF) under one\n", |
| "# or more contributor license agreements. See the NOTICE file\n", |
| "# distributed with this work for additional information\n", |
| "# regarding copyright ownership. The ASF licenses this file\n", |
| "# to you under the Apache License, Version 2.0 (the\n", |
| "# \"License\"); you may not use this file except in compliance\n", |
| "# with the License. You may obtain a copy of the License at\n", |
| "#\n", |
| "# http://www.apache.org/licenses/LICENSE-2.0\n", |
| "#\n", |
| "# Unless required by applicable law or agreed to in writing,\n", |
| "# software distributed under the License is distributed on an\n", |
| "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", |
| "# KIND, either express or implied. See the License for the\n", |
| "# specific language governing permissions and limitations\n", |
| "# under the License." |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "83TJhNxLD7-W" |
| }, |
| "source": [ |
| " # **Introduction to Windowing for Batch Processing in Apache Beam**\n", |
| "\n", |
| "In this notebook, we will learn the fundamentals of **batch processing** as we walk through a few introductory examples in Beam. \n", |
| "The pipelines in these examples process real-world data for air quality levels in India between 2017 and 2022.\n", |
| "\n", |
| "After this tutorial you should have a basic understanding of the following:\n", |
| "\n", |
| "* What is **batch vs. stream** data processing?\n", |
| "* How can I use Beam to run a **simple batch analysis job**?\n", |
| "* How can I use Beam's **windowing features** to process only certain intervals of data at a time?" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "Dj3ftRRqfumW" |
| }, |
| "source": [ |
| "To begin, run the following cell to set up Apache Beam." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "zmJ0pCmSvD0-", |
| "outputId": "9041f637-12a0-4f78-f60b-ebd3c3a1c186" |
| }, |
| "outputs": [], |
| "source": [ |
| "# Install apache-beam.\n", |
| "!pip install --quiet apache-beam" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "7sBoLahzPlJ1" |
| }, |
| "outputs": [], |
| "source": [ |
| "# Set the logging level to reduce verbose information\n", |
| "import logging\n", |
| "\n", |
| "logging.root.setLevel(logging.ERROR)" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "BB6FAwYj1dAi" |
| }, |
| "source": [ |
| "<hr style=\"border: 5px solid #003262;\" />\n", |
| "<hr style=\"border: 1px solid #fdb515;\" />\n", |
| "\n", |
| "## Batch vs. Stream Data Processing\n", |
| "\n", |
| "What's the difference?\n", |
| "\n", |
| "**Batch processing** is when data processing and analysis happens on a set of data that have already been stored over a period of time. \n", |
| "In other words, the input is a finite, bounded data set. \n", |
| "Examples include payroll and billing systems, which have to be processed weekly or monthly.\n", |
| "\n", |
| "**Stream processing** happens *as* data flows through a system. This results in analysis and reporting of events \n", |
| "within a short period of time or near real-time on an infinite, unbounded data set. \n", |
| "Examples include fraud detection or intrusion detection, which requires the continuous processing of transaction data.\n", |
| "\n", |
| "> This tutorial will focus on **batch processing** examples. \n", |
| "To learn more about stream processing in Beam, check out the [Python Streaming](https://beam.apache.org/documentation/sdks/python-streaming/) page." |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "W_63UtsoBRql" |
| }, |
| "source": [ |
| "<hr style=\"border: 5px solid #003262;\" />\n", |
| "\n", |
| "## Load the Data\n", |
| "\n", |
| "Let's import the example data we will be using throughout this tutorial. The [dataset](https://www.kaggle.com/datasets/fedesoriano/air-quality-data-in-india) \n", |
| "consists of **hourly air quality data (PM 2.5) in India from November 2017 to June 2022**.\n", |
| "\n", |
| "> The World Health Organization (WHO) reports 7 million premature deaths linked to air pollution each year. \n", |
| "In India alone, more than 90% of the country's population live in areas where air quality is below the WHO's standards.\n", |
| "\n", |
| "**What does the data look like?**\n", |
| "\n", |
| "The data set has 36,192 rows and 6 columns in total recording the following attributes:\n", |
| "\n", |
| "1. `Timestamp`: Timestamp in the format YYYY-MM-DD HH:MM:SS\n", |
| "2. `Year`: Year of the measure\n", |
| "3. `Month`: Month of the measure\n", |
| "4. `Day`: Day of the measure\n", |
| "5. `Hour`: Hour of the measure\n", |
| "6. `PM2.5`: Fine particulate matter air pollutant level in air\n", |
| "\n", |
| "**For our purposes, we will focus on only the first and last column of the** `air_quality` **DataFrame**:\n", |
| "\n", |
| "1. `Timestamp`: Timestamp in the format YYYY-MM-DD HH:MM:SS\n", |
| "2. `PM 2.5`: Fine particulate matter air pollutant level in air\n", |
| "\n", |
| "Run the following cell to load the data into our file directory." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "GTteBUZ-7e2s", |
| "outputId": "3af9cdb0-c248-4c6d-96f6-c3739fb66014" |
| }, |
| "outputs": [], |
| "source": [ |
| "# Copy the dataset file into the local file system from Google Cloud Storage.\n", |
| "!mkdir -p data\n", |
| "!gsutil cp gs://batch-processing-example/air-quality-india.csv data/" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "1NcmPl7C43lY" |
| }, |
| "source": [ |
| "#### Data Preparation\n", |
| "\n", |
| "Before we load the data into a Beam pipeline, let's use Pandas to select two columns." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/", |
| "height": 206 |
| }, |
| "id": "dq-k7hwRf4MA", |
| "outputId": "7d70a959-5278-453e-9315-f5ed06821744" |
| }, |
| "outputs": [], |
| "source": [ |
| "# Load the data into a Python Pandas DataFrame.\n", |
| "import pandas as pd\n", |
| "\n", |
| "air_quality = pd.read_csv(\"data/air-quality-india.csv\")\n", |
| "air_quality.head()" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/", |
| "height": 237 |
| }, |
| "id": "WNXrvP-wDIkA", |
| "outputId": "3e932987-41b3-4aaf-b49f-3707a9728322" |
| }, |
| "outputs": [], |
| "source": [ |
| "import csv\n", |
| "\n", |
| "#Select only the two features of the DataFrame we're interested in.\n", |
| "airq = air_quality.loc[:, [\"Timestamp\", \"PM2.5\"]].set_index(\"Timestamp\")\n", |
| "saved_new = pd.DataFrame(airq)\n", |
| "saved_new.to_csv(\"data/air_quality.csv\")\n", |
| "airq.head()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "VRFkb_sLDUCD" |
| }, |
| "source": [ |
| "<hr style=\"border: 5px solid #003262;\" />\n", |
| "\n", |
| "# 1. Average Air Quality Index (AQI)\n", |
| "\n", |
| "Before we explore more advanced batch processing with different types of windowing, we will start with a simple batch analysis example.\n", |
| "\n", |
| "Our **objective** is to analyze the *entire* dataset to find the **average PM2.5 air quality index** in India across the entire 11/2017-6/2022 period.\n", |
| "\n", |
| "> This examples uses the `GlobalWindow`, which is a single window that covers the entire PCollection. \n", |
| "All pipelines use the [`GlobalWindow`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.window.html#apache_beam.transforms.window.GlobalWindow) by default. \n", |
| "In many cases, especially for batch pipelines, this is what we want since we want to analyze all the data that we have." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/", |
| "height": 34 |
| }, |
| "id": "v06NFe9sDYXc", |
| "outputId": "f65eae63-0424-4ac0-8609-78e98ac21bd0" |
| }, |
| "outputs": [], |
| "source": [ |
| "import apache_beam as beam\n", |
| "\n", |
| "def parse_file(element):\n", |
| " file = csv.reader([element], quotechar='\"', delimiter=',',\n", |
| " quoting=csv.QUOTE_ALL, skipinitialspace=True)\n", |
| " for line in file:\n", |
| " return line\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " (\n", |
| " pipeline\n", |
| " | 'Read input file' >> beam.io.ReadFromText(\"data/air_quality.csv\",\n", |
| " skip_header_lines=1)\n", |
| " | 'Parse file' >> beam.Map(parse_file)\n", |
| " | 'Get PM' >> beam.Map(lambda x: float(x[1])) # only process PM2.5\n", |
| " | 'Get mean value' >> beam.combiners.Mean.Globally()\n", |
| " | beam.Map(print))" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "GmHEE1G5Y1z-", |
| "outputId": "248ee3d7-43af-4b53-9832-8da0eb7ac974" |
| }, |
| "outputs": [], |
| "source": [ |
| "# To verify, the above mean value matches what Pandas produces\n", |
| "airq[\"PM2.5\"].mean()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "b3gGxC6w6qXx" |
| }, |
| "source": [ |
| "**Congratulations!** You just created a simple aggregation processing pipeline in batch using `GlobalWindow`." |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "vRameihqDJ8l" |
| }, |
| "source": [ |
| "<hr style=\"border: 5px solid #003262;\" />\n", |
| "\n", |
| "# 2. Advanced Processing in Batch with Windowing\n", |
| "\n", |
| "Sometimes, we want to [aggregate](https://beam.apache.org/documentation/transforms/python/overview/#aggregation) data, like `GroupByKey` or `Combine`, \n", |
| "only at certain intervals, like hourly or daily, instead of processing the entire `PCollection` of data only once.\n", |
| "\n", |
| "In this case, our **objective** is to determine the **fluctuations of air quality *every 30 days*.\n", |
| "\n", |
| "**_Windows_** in Beam allow us to process only certain data intervals at a time.\n", |
| "In this notebook, we will go through different ways of windowing our pipeline.\n", |
| "\n", |
| "We have already been introduced to the default GlobalWindow (see above) that covers the entire PCollection. \n", |
| "Now we will dive into **fixed time windows, sliding time windows, and session windows**.\n", |
| "\n", |
| "> [Another windowing tutorial](https://colab.sandbox.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/windowing.ipynb) with a toy dataset is recommended to go through." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "gj0_S5Ka3-zb" |
| }, |
| "source": [ |
| "### First, we need to convert timestamps to Unix time\n", |
| "\n", |
| "Apache Beam requires us to provide the timestamp as Unix time. Let us write the simple time conversion function:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "nKBYsxFg4SIa" |
| }, |
| "outputs": [], |
| "source": [ |
| "import time\n", |
| "\n", |
| "# This function is modifiable and can convert integers to time formats like unix\n", |
| "# Without this function and .strptime, you may run into comparison issues!\n", |
| "def to_unix_time(time_str: str, time_format='%Y-%m-%d %H:%M:%S') -> int:\n", |
| " \"\"\"Converts a time string into Unix time.\"\"\"\n", |
| " time_tuple = time.strptime(time_str, time_format)\n", |
| " return int(time.mktime(time_tuple))" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "_mPge0KdRx20", |
| "outputId": "43475bbe-548a-4817-ed0b-534cebbe70ce" |
| }, |
| "outputs": [], |
| "source": [ |
| "to_unix_time('2021-10-14 14:00:00')" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "lL0_QONF1aMH" |
| }, |
| "source": [ |
| "### Second, let us define some helper functions to develop our pipeline\n", |
| "\n", |
| "In this code, we have a transform (`PrintElementInfo`) to help us analyze an element alongside its window information, \n", |
| "and we have another transform (`PrintWindowInfo`) to help us analyze how many elements landed into each window.\n", |
| "We use a custom [`DoFn`](https://beam.apache.org/documentation/transforms/python/elementwise/pardo)\n", |
| "to access that information." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "KtPL-echb2xv" |
| }, |
| "outputs": [], |
| "source": [ |
| "#@title Helper functions to develop our pipeline\n", |
| "\n", |
| "def human_readable_window(window) -> str:\n", |
| " \"\"\"Formats a window object into a human readable string.\"\"\"\n", |
| " if isinstance(window, beam.window.GlobalWindow):\n", |
| " return str(window)\n", |
| " return f'{window.start.to_utc_datetime()} - {window.end.to_utc_datetime()}'\n", |
| "\n", |
| "class PrintElementInfo(beam.DoFn):\n", |
| " \"\"\"Prints an element with its Window information for debugging.\"\"\"\n", |
| " def process(self, element, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):\n", |
| " print(f'[{human_readable_window(window)}] {timestamp.to_utc_datetime()} -- {element}')\n", |
| " yield element\n", |
| "\n", |
| "@beam.ptransform_fn\n", |
| "def PrintWindowInfo(pcollection):\n", |
| " \"\"\"Prints the Window information with AQI in that window for debugging.\"\"\"\n", |
| " class PrintAQI(beam.DoFn):\n", |
| " def process(self, mean_elements, window=beam.DoFn.WindowParam):\n", |
| " print(f'>> Window [{human_readable_window(window)}], AQI: {mean_elements}')\n", |
| " yield mean_elements\n", |
| "\n", |
| " return (\n", |
| " pcollection\n", |
| " | 'Count elements per window' >> beam.combiners.Mean.Globally().without_defaults()\n", |
| " | 'Print counts info' >> beam.ParDo(PrintAQI())\n", |
| " )" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "dtQbcRU6XYCr" |
| }, |
| "source": [ |
| "Note: when you run below code, pay attention to how the human readable windows varies for each window type.\n", |
| "\n", |
| "You can also use the built-in [`LogElements`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.LogElements) \n", |
| "PTransform to print the elements with the timestamp and window information. \n", |
| "\n", |
| "To illustrate how windowing works, we will use the below toy data:" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "oOLx4IsZXoTO" |
| }, |
| "outputs": [], |
| "source": [ |
| "# a toy data\n", |
| "air_toy_data = [\n", |
| " ['2021-10-14 14:00:00', '43.27'],\n", |
| " ['2021-10-14 15:00:00', '44.17'],\n", |
| " ['2021-10-14 16:00:00', '48.77'],\n", |
| " ['2021-10-14 17:00:00', '55.57'],\n", |
| " ['2021-10-14 18:00:00', '56.95'],\n", |
| " ['2021-10-21 09:00:00', '36.77'],\n", |
| " ['2021-10-21 10:00:00', '34.87'],\n", |
| " ['2021-11-17 01:00:00', '62.64'],\n", |
| " ['2021-11-17 02:00:00', '65.28'],\n", |
| " ['2021-11-17 03:00:00', '65.53'],\n", |
| " ['2021-11-17 04:00:00', '70.18'],\n", |
| " ['2021-12-11 21:00:00', '69.07'],\n", |
| " ['2022-01-02 21:00:00', '76.56'],\n", |
| " ['2022-01-02 22:00:00', '78.77'],\n", |
| " ['2022-01-02 23:00:00', '73.16'],\n", |
| " ['2022-01-03 03:00:00', '74.05'],\n", |
| " ['2022-01-03 19:00:00', '100.28'],\n", |
| " ['2022-01-03 22:00:00', '80.92'],\n", |
| " ['2022-01-04 05:00:00', '80.48'],\n", |
| " ['2022-01-04 07:00:00', '84.0'],\n", |
| " ['2022-01-04 18:00:00', '95.49'],\n", |
| " ['2022-01-05 00:00:00', '69.01'],\n", |
| " ['2022-01-05 07:00:00', '76.85'],]" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "_QoStuYV-uku" |
| }, |
| "source": [ |
| "### Fixed time windows\n", |
| "\n", |
| "`FixedWindows` allow us to create fixed-sized windows with evenly spaced intervals.\n", |
| "We only need to specify the _window size_ in seconds.\n", |
| "\n", |
| "In Python, we can use [`timedelta`](https://docs.python.org/3/library/datetime.html#timedelta-objects)\n", |
| "to help us do the conversion of minutes, hours, or days for us.\n", |
| "\n", |
| "We then use the [`WindowInto`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html?highlight=windowinto#apache_beam.transforms.core.WindowInto)\n", |
| "transform to apply the kind of window we want." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "Q7Q4yVzh8XWO", |
| "outputId": "076752f2-1b40-4d07-9419-88757adc99be" |
| }, |
| "outputs": [], |
| "source": [ |
| "import apache_beam as beam\n", |
| "from datetime import timedelta\n", |
| "\n", |
| "\n", |
| "# We first set the window size to around 1 month.\n", |
| "window_size = timedelta(days=30).total_seconds() # in seconds\n", |
| "\n", |
| "\n", |
| "# Let us set up the windowed pipeline and compute AQI every 30 days\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " (\n", |
| " pipeline\n", |
| " | beam.Create(air_toy_data)\n", |
| " | 'With timestamps' >> beam.MapTuple(\n", |
| " lambda timestamp, element:\n", |
| " beam.window.TimestampedValue(float(element), to_unix_time(timestamp))\n", |
| " )\n", |
| " | 'Fixed windows' >> beam.WindowInto(beam.window.FixedWindows(window_size))\n", |
| " | 'Print element info' >> beam.ParDo(PrintElementInfo())\n", |
| " | 'Print window info' >> PrintWindowInfo()\n", |
| " )" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "N-bqgv5K_INW" |
| }, |
| "source": [ |
| "### Sliding time windows\n", |
| "\n", |
| "[`Sliding windows`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.window.html#apache_beam.transforms.window.SlidingWindows)\n", |
| "allow us to compute AQI every 30 days but each window should cover the last 15 days.\n", |
| "We can specify the _window size_ in seconds just like with `FixedWindows` to define the window size. \n", |
| "We also need to specify a _window period_ in seconds, which is how often we want to emit each window." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "Lrj1mb6Q_LW7", |
| "outputId": "4418b5bd-6dc0-44a3-ca1a-d231ec9af9e1" |
| }, |
| "outputs": [], |
| "source": [ |
| "import apache_beam as beam\n", |
| "from datetime import timedelta\n", |
| "\n", |
| "# Sliding windows of 30 days and emit one every 15 days.\n", |
| "window_size = timedelta(days=30).total_seconds() # in seconds\n", |
| "window_period = timedelta(days=15).total_seconds() # in seconds\n", |
| "print(f'window_size: {window_size} seconds')\n", |
| "print(f'window_period: {window_period} seconds')\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " (\n", |
| " pipeline\n", |
| " | 'Air Quality' >> beam.Create(air_toy_data)\n", |
| " | 'With timestamps' >> beam.MapTuple(\n", |
| " lambda timestamp, element:\n", |
| " beam.window.TimestampedValue(float(element), to_unix_time(timestamp))\n", |
| " )\n", |
| " | 'Sliding windows' >> beam.WindowInto(\n", |
| " beam.window.SlidingWindows(window_size, window_period)\n", |
| " )\n", |
| " | 'Print element info' >> beam.ParDo(PrintElementInfo())\n", |
| " | 'Print window info' >> PrintWindowInfo()\n", |
| " )" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "Y_SbevXv_MDk" |
| }, |
| "source": [ |
| "### Session time windows\n", |
| "\n", |
| "Maybe we don't want regular windows, but instead, have the windows reflect periods where activity happened. [`Sessions`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.window.html#apache_beam.transforms.window.Sessions)\n", |
| "allow us to create those windows.\n", |
| "We only need to specify a _gap size_ in seconds, which is the maximum number of seconds of inactivity to close a session window. \n", |
| "In this case, if no event happens within 10 days, the current session window closes and \n", |
| "is emitted and a new session window is created whenever the next event happens." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "zgNc-c3THB7G", |
| "outputId": "d8d5fed1-8748-4845-cbc3-0b898ce4bcd8" |
| }, |
| "outputs": [], |
| "source": [ |
| "import apache_beam as beam\n", |
| "from datetime import timedelta\n", |
| "\n", |
| "# Sessions divided by 10 days.\n", |
| "gap_size = timedelta(days=10).total_seconds() # in seconds\n", |
| "print(f'gap_size: {gap_size} seconds')\n", |
| "\n", |
| "with beam.Pipeline() as pipeline:\n", |
| " (\n", |
| " pipeline\n", |
| " | 'Air Quality' >> beam.Create(air_toy_data)\n", |
| " | 'With timestamps' >> beam.MapTuple(\n", |
| " lambda timestamp, element:\n", |
| " beam.window.TimestampedValue(float(element), to_unix_time(timestamp))\n", |
| " )\n", |
| " | 'Session windows' >> beam.WindowInto(beam.window.Sessions(gap_size))\n", |
| " | 'Print element info' >> beam.ParDo(PrintElementInfo())\n", |
| " | 'Print window info' >> PrintWindowInfo()\n", |
| " )" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "XcX0t6hya85F" |
| }, |
| "source": [ |
| "Note how the above windows are irregular." |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": { |
| "id": "EBSlyeBibNL0" |
| }, |
| "source": [ |
| "<hr style=\"border: 5px solid #003262;\" />\n", |
| "\n", |
| "# 3. Put All Together\n", |
| "\n", |
| "Section 2 uses the toy data to go through three different windowing types. Now it is time to analyze the real data (`data/air_quality.csv`).\n", |
| "\n", |
| "Can you build one Beam pipeline to compute all AQI values for these windows?\n" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "plMjLuh-lKzr" |
| }, |
| "outputs": [], |
| "source": [ |
| "#@title Edit This Code Cell\n", |
| "..." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "cellView": "form", |
| "id": "t6vBMax0bubN" |
| }, |
| "outputs": [], |
| "source": [ |
| "# @title Expand to see the answer\n", |
| "import csv\n", |
| "import time\n", |
| "from datetime import timedelta\n", |
| "\n", |
| "import apache_beam as beam\n", |
| "import apache_beam.runners.interactive.interactive_beam as ib\n", |
| "\n", |
| "\n", |
| "def to_unix_time(time_str: str, time_format=\"%Y-%m-%d %H:%M:%S\") -> int:\n", |
| " \"\"\"Converts a time string into Unix time.\"\"\"\n", |
| " time_tuple = time.strptime(time_str, time_format)\n", |
| " return int(time.mktime(time_tuple))\n", |
| "\n", |
| "\n", |
| "def human_readable_window(window) -> str:\n", |
| " \"\"\"Formats a window object into a human readable string.\"\"\"\n", |
| " if isinstance(window, beam.window.GlobalWindow):\n", |
| " return str(window)\n", |
| " return f\"{window.start.to_utc_datetime()} - {window.end.to_utc_datetime()}\"\n", |
| "\n", |
| "\n", |
| "@beam.ptransform_fn\n", |
| "def OutputWindowInfo(pcollection):\n", |
| " \"\"\"Output the Window information with AQI in that window.\"\"\"\n", |
| "\n", |
| " class GetAQI(beam.DoFn):\n", |
| " def process(self, mean_elements, window=beam.DoFn.WindowParam):\n", |
| " yield human_readable_window(window), mean_elements\n", |
| "\n", |
| " return (\n", |
| " pcollection\n", |
| " | \"Count elements per window\"\n", |
| " >> beam.combiners.Mean.Globally().without_defaults()\n", |
| " | \"Output counts info\" >> beam.ParDo(GetAQI())\n", |
| " )\n", |
| "\n", |
| "\n", |
| "def parse_file(element):\n", |
| " file = csv.reader(\n", |
| " [element],\n", |
| " quotechar='\"',\n", |
| " delimiter=\",\",\n", |
| " quoting=csv.QUOTE_ALL,\n", |
| " skipinitialspace=True,\n", |
| " )\n", |
| " for line in file:\n", |
| " return line\n", |
| "\n", |
| "\n", |
| "p = beam.Pipeline()\n", |
| "\n", |
| "# get the data\n", |
| "read_text = (\n", |
| " p\n", |
| " | \"Read input file\"\n", |
| " >> beam.io.ReadFromText(\"data/air_quality.csv\", skip_header_lines=1)\n", |
| " | \"Parse file\" >> beam.Map(parse_file)\n", |
| " | \"With timestamps\"\n", |
| " >> beam.MapTuple(\n", |
| " lambda timestamp, element: beam.window.TimestampedValue(\n", |
| " float(element), to_unix_time(timestamp)\n", |
| " )\n", |
| " )\n", |
| ")\n", |
| "\n", |
| "# define the fixed window\n", |
| "window_size = timedelta(days=30).total_seconds() # in seconds\n", |
| "fixed_window = (\n", |
| " read_text\n", |
| " | \"Fixed windows\" >> beam.WindowInto(beam.window.FixedWindows(window_size))\n", |
| " | \"Output fixed window info\" >> OutputWindowInfo()\n", |
| " | \"Write fixed window info\"\n", |
| " >> beam.io.WriteToText(\"output_fixed\", file_name_suffix=\".txt\")\n", |
| ")\n", |
| "\n", |
| "# define the sliding window\n", |
| "window_period = timedelta(days=15).total_seconds() # in seconds\n", |
| "sliding_window = (\n", |
| " read_text\n", |
| " | \"Sliding windows\"\n", |
| " >> beam.WindowInto(beam.window.SlidingWindows(window_size, window_period))\n", |
| " | \"Output sliding window info\" >> OutputWindowInfo()\n", |
| " | \"Write sliding window info\"\n", |
| " >> beam.io.WriteToText(\"output_sliding\", file_name_suffix=\".txt\")\n", |
| ")\n", |
| "\n", |
| "# define the session window\n", |
| "gap_size = timedelta(days=10).total_seconds() # in seconds\n", |
| "session_window = (\n", |
| " read_text\n", |
| " | \"Session windows\" >> beam.WindowInto(beam.window.Sessions(gap_size))\n", |
| " | \"Output session window info\" >> OutputWindowInfo()\n", |
| " | \"Write session window info\"\n", |
| " >> beam.io.WriteToText(\"output_session\", file_name_suffix=\".txt\")\n", |
| ")" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/", |
| "height": 1000 |
| }, |
| "id": "trT64iyteYii", |
| "outputId": "53b1bb2b-2293-4655-fb81-e7b1ee9d36f8" |
| }, |
| "outputs": [], |
| "source": [ |
| "# check the entire graph\n", |
| "ib.show_graph(p)" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "mgAmsJkbcw-f", |
| "outputId": "efc8e611-21b7-4f4b-8d08-3b25bd84dea9" |
| }, |
| "outputs": [], |
| "source": [ |
| "# run it!\n", |
| "p.run()" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "5egxx_0mdwTQ", |
| "outputId": "637a0d7c-d1fb-4351-a642-61146efa2a99" |
| }, |
| "outputs": [], |
| "source": [ |
| "! head -n 5 output_fixed*.txt" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "_-_qOrSWeT0L", |
| "outputId": "fac42a53-16bf-4498-d452-568260bd15fb" |
| }, |
| "outputs": [], |
| "source": [ |
| "! head -n 5 output_sliding*.txt" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "colab": { |
| "base_uri": "https://localhost:8080/" |
| }, |
| "id": "E87j01DGfN_f", |
| "outputId": "304a1baf-04e5-46d8-cf95-bb789511daa4" |
| }, |
| "outputs": [], |
| "source": [ |
| "! head -n 5 output_session*.txt" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": { |
| "id": "DY6k7A9Kigkb" |
| }, |
| "outputs": [], |
| "source": [] |
| } |
| ], |
| "metadata": { |
| "colab": { |
| "provenance": [] |
| }, |
| "kernelspec": { |
| "display_name": "Python 3", |
| "name": "python3" |
| }, |
| "language_info": { |
| "name": "python" |
| } |
| }, |
| "nbformat": 4, |
| "nbformat_minor": 0 |
| } |