| { |
| "cells": [ |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "Licensed under the Apache License, Version 2.0 (the \"License\");\n", |
| "<!--\n", |
| " Licensed to the Apache Software Foundation (ASF) under one\n", |
| " or more contributor license agreements. See the NOTICE file\n", |
| " distributed with this work for additional information\n", |
| " regarding copyright ownership. The ASF licenses this file\n", |
| " to you under the Apache License, Version 2.0 (the\n", |
| " \"License\"); you may not use this file except in compliance\n", |
| " with the License. You may obtain a copy of the License at\n", |
| "\n", |
| " http://www.apache.org/licenses/LICENSE-2.0\n", |
| "\n", |
| " Unless required by applicable law or agreed to in writing,\n", |
| " software distributed under the License is distributed on an\n", |
| " \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", |
| " KIND, either express or implied. See the License for the\n", |
| " specific language governing permissions and limitations\n", |
| " under the License.\n", |
| "-->\n" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "# Explore data from covidtracking.com\n", |
| "The data set is relatively small and used as a demonstration of working with Beam in an interactive notebook environment.\n", |
| "\n", |
| "There are two ways to get the data:\n", |
| "\n", |
| "- Get json data from APIs.\n", |
| "- Download data in csv files directly.\n", |
| "\n", |
| "We'll have a batch Beam pipeline example utilizing either method." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "import json\n", |
| "import requests\n", |
| "\n", |
| "json_current='https://covidtracking.com/api/v1/states/current.json'\n", |
| "json_historical='https://covidtracking.com/api/v1/states/daily.json'\n", |
| "\n", |
| "def get_json_data(url):\n", |
| " with requests.Session() as session:\n", |
| " data = json.loads(session.get(url).text)\n", |
| " return data\n", |
| "\n", |
| "csv_current = 'https://covidtracking.com/api/v1/states/current.csv'\n", |
| "csv_historical = 'https://covidtracking.com/api/v1/states/daily.csv'\n", |
| "\n", |
| "def download_csv(url, filename):\n", |
| " if not filename.endswith('.csv'):\n", |
| " filename = filename + '.csv'\n", |
| " with requests.Session() as session:\n", |
| " with open(filename, 'wb') as f:\n", |
| " f.write(session.get(url).content)\n", |
| " return filename" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "Below reads data into memory as json." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "current_data = get_json_data(json_current)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "Below downloads data in csv format stored in files." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "csv_file_current = download_csv(csv_current, 'current')" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "Prepare some Apache Beam dependencies." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "import apache_beam as beam\n", |
| "from apache_beam.runners.interactive import interactive_beam as ib\n", |
| "from apache_beam.runners.interactive.interactive_runner import InteractiveRunner" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "Create a Beam pipeline." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "p = beam.Pipeline(runner=InteractiveRunner())" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "You can create a PCollection from either in-memory json data or data in files." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "current_data_from_json = p | 'Create PCollection from json' >> beam.Create(current_data)\n", |
| "current_data_from_files = p | 'Create PCollection from files' >> beam.io.ReadFromText(csv_file_current, skip_header_lines=1)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "The in-memory json data is already structured." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "ib.show(current_data_from_json)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "The data from files read as plain text is not structured, we'll have to handle it.\n", |
| "\n", |
| "For a batch pipeline reading files with huge content size, it's normal to read source data from files and let Beam handle the work load." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "ib.show(current_data_from_files)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "We'll parse the plain texts into structured data with Beam SDK." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "from csv import reader\n", |
| "\n", |
| "def read_headers(csv_file):\n", |
| " with open(csv_file, 'r') as f:\n", |
| " header_line = f.readline().strip()\n", |
| " return next(reader([header_line]))\n", |
| "\n", |
| "current_data_headers = read_headers(csv_file_current)" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "from collections import namedtuple\n", |
| "\n", |
| "UsCovidData = namedtuple('UsCovidData', current_data_headers)\n", |
| "\n", |
| "class UsCovidDataCsvReader(beam.DoFn):\n", |
| " def __init__(self, schema):\n", |
| " self._schema = schema\n", |
| " \n", |
| " def process(self, element):\n", |
| " values = [int(val) if val.isdigit() else val for val in next(reader([element]))]\n", |
| " return [self._schema(*values)]" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "current_data = current_data_from_files | 'Parse' >> beam.ParDo(UsCovidDataCsvReader(UsCovidData))" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "ib.show(current_data)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "With Interactive Beam, you can collect a PCollection into a pandas dataframe. It's useful when you just want to play with small test data sets locally on a single machine." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "df = ib.collect(current_data)\n", |
| "df.describe()" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "Now let's take a deeper look into the data with the visualization feature of Interactive Beam and come up with some tasks." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "ib.show(current_data, visualize_data=True)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "We can find out that NY currently has the most positive COVID cases with above facets visualization because the data set is small (for demo).\n", |
| "\n", |
| "Now we can write a beam transform to try to get that same conclusion of which state has the highest positive number currently." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "from functools import total_ordering\n", |
| "\n", |
| "@total_ordering\n", |
| "class UsCovidDataOrderByPositive:\n", |
| " def __init__(self, data):\n", |
| " self._data = data\n", |
| " \n", |
| " def __gt__(self, other):\n", |
| " return self._data.positive > other._data.positive\n", |
| "\n", |
| "\n", |
| "def maximum_positive(values):\n", |
| " return max(values) if values else None\n", |
| "\n", |
| "max_positive = (current_data \n", |
| " | 'Data OrderByPositive' >> beam.Map(lambda data: UsCovidDataOrderByPositive(data))\n", |
| " | 'Find Maximum Positive' >> beam.CombineGlobally(maximum_positive)\n", |
| " | 'Convert Back to Data' >> beam.Map(lambda orderable_data: orderable_data._data))" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "ib.show(max_positive)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "We can also try to come up with the total positive case number in the US." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "total_positive = (current_data\n", |
| " | 'Positive Per State' >> beam.Map(lambda data: data.positive)\n", |
| " | 'Total Positive' >> beam.CombineGlobally(sum))" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "ib.show(total_positive)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "Now let's look at some more complicated data: the historical data.\n", |
| "\n", |
| "It contains similar data to current for each day until current day." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "csv_file_historical = download_csv(csv_historical, 'historical')" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "historical_data_from_files = p | 'Create PCollection for historical data from files' >> beam.io.ReadFromText(csv_file_historical, skip_header_lines=1)" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "ib.show(historical_data_from_files)" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "historical_data_headers = read_headers(csv_file_historical)\n", |
| "\n", |
| "HistoricalUsCovidData = namedtuple('HistoricalUsCovidData', historical_data_headers)\n", |
| "\n", |
| "historical_data = historical_data_from_files | 'Parse' >> beam.ParDo(UsCovidDataCsvReader(HistoricalUsCovidData))" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "ib.show(historical_data)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "For demostration, let's just take a look at NY throughout the timeline." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "class FilterByState(beam.DoFn):\n", |
| " def __init__(self, state):\n", |
| " self._state = state\n", |
| " \n", |
| " def process(self, element):\n", |
| " if element.state == self._state:\n", |
| " yield element\n", |
| "\n", |
| "ny_data = historical_data | 'Filter NY' >> beam.ParDo(FilterByState('NY'))" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "Then we do a visualization to see if there is anything worth looking for." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "ib.show(ny_data, visualize_data=True)" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "There happens to be a field named `positiveIncrease`. If not, we'll need to write some transforms to deduce the per day positive increment value.\n", |
| "\n", |
| "Now let's try to find out the date with the most `positiveIncrease` for NY." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "@total_ordering\n", |
| "class UsCovidDataOrderByPositiveIncrease:\n", |
| " def __init__(self, data):\n", |
| " self._data = data\n", |
| " \n", |
| " def __gt__(self, other):\n", |
| " self_positive_increase = self._data.positiveIncrease if self._data.positiveIncrease else 0\n", |
| " other_positive_increase = other._data.positiveIncrease if other._data.positiveIncrease else 0\n", |
| " return self_positive_increase > other_positive_increase\n", |
| "\n", |
| "\n", |
| "def maximum_positive_increase(values):\n", |
| " return max(values) if values else None\n", |
| "\n", |
| "worst_day = (ny_data\n", |
| " | 'Order By PositiveIncrease' >> beam.Map(lambda data: UsCovidDataOrderByPositiveIncrease(data))\n", |
| " | 'Maximum Positive Increase' >> beam.CombineGlobally(maximum_positive_increase)\n", |
| " | 'Convert Back to Data' >> beam.Map(lambda orderable_data: orderable_data._data)\n", |
| " | 'Extract Date' >> beam.Map(lambda data: data.date))" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "ib.show(worst_day)" |
| ] |
| } |
| ], |
| "metadata": { |
| "language_info": { |
| "codemirror_mode": { |
| "name": "ipython", |
| "version": 3 |
| }, |
| "file_extension": ".py", |
| "mimetype": "text/x-python", |
| "name": "python", |
| "nbconvert_exporter": "python", |
| "pygments_lexer": "ipython3", |
| "version": "3.7.7" |
| } |
| }, |
| "nbformat": 4, |
| "nbformat_minor": 4 |
| } |