blob: cc952685dded35852f203641f415ed35b0707359 [file] [log] [blame]
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"<!--\n",
" Licensed to the Apache Software Foundation (ASF) under one\n",
" or more contributor license agreements. See the NOTICE file\n",
" distributed with this work for additional information\n",
" regarding copyright ownership. The ASF licenses this file\n",
" to you under the Apache License, Version 2.0 (the\n",
" \"License\"); you may not use this file except in compliance\n",
" with the License. You may obtain a copy of the License at\n",
"\n",
" http://www.apache.org/licenses/LICENSE-2.0\n",
"\n",
" Unless required by applicable law or agreed to in writing,\n",
" software distributed under the License is distributed on an\n",
" \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
" KIND, either express or implied. See the License for the\n",
" specific language governing permissions and limitations\n",
" under the License.\n",
"-->\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Explore data from covidtracking.com\n",
"The data set is relatively small and used as a demonstration of working with Beam in an interactive notebook environment.\n",
"\n",
"There are two ways to get the data:\n",
"\n",
"- Get json data from APIs.\n",
"- Download data in csv files directly.\n",
"\n",
"We'll have a batch Beam pipeline example utilizing either method."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"import requests\n",
"\n",
"json_current='https://covidtracking.com/api/v1/states/current.json'\n",
"json_historical='https://covidtracking.com/api/v1/states/daily.json'\n",
"\n",
"def get_json_data(url):\n",
" with requests.Session() as session:\n",
" data = json.loads(session.get(url).text)\n",
" return data\n",
"\n",
"csv_current = 'https://covidtracking.com/api/v1/states/current.csv'\n",
"csv_historical = 'https://covidtracking.com/api/v1/states/daily.csv'\n",
"\n",
"def download_csv(url, filename):\n",
" if not filename.endswith('.csv'):\n",
" filename = filename + '.csv'\n",
" with requests.Session() as session:\n",
" with open(filename, 'wb') as f:\n",
" f.write(session.get(url).content)\n",
" return filename"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Below reads data into memory as json."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"current_data = get_json_data(json_current)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Below downloads data in csv format stored in files."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"csv_file_current = download_csv(csv_current, 'current')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Prepare some Apache Beam dependencies."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"from apache_beam.runners.interactive import interactive_beam as ib\n",
"from apache_beam.runners.interactive.interactive_runner import InteractiveRunner"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a Beam pipeline."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"p = beam.Pipeline(runner=InteractiveRunner())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can create a PCollection from either in-memory json data or data in files."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"current_data_from_json = p | 'Create PCollection from json' >> beam.Create(current_data)\n",
"current_data_from_files = p | 'Create PCollection from files' >> beam.io.ReadFromText(csv_file_current, skip_header_lines=1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The in-memory json data is already structured."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ib.show(current_data_from_json)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The data from files read as plain text is not structured, we'll have to handle it.\n",
"\n",
"For a batch pipeline reading files with huge content size, it's normal to read source data from files and let Beam handle the work load."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ib.show(current_data_from_files)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We'll parse the plain texts into structured data with Beam SDK."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from csv import reader\n",
"\n",
"def read_headers(csv_file):\n",
" with open(csv_file, 'r') as f:\n",
" header_line = f.readline().strip()\n",
" return next(reader([header_line]))\n",
"\n",
"current_data_headers = read_headers(csv_file_current)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from collections import namedtuple\n",
"\n",
"UsCovidData = namedtuple('UsCovidData', current_data_headers)\n",
"\n",
"class UsCovidDataCsvReader(beam.DoFn):\n",
" def __init__(self, schema):\n",
" self._schema = schema\n",
" \n",
" def process(self, element):\n",
" values = [int(val) if val.isdigit() else val for val in next(reader([element]))]\n",
" return [self._schema(*values)]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"current_data = current_data_from_files | 'Parse' >> beam.ParDo(UsCovidDataCsvReader(UsCovidData))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ib.show(current_data)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"With Interactive Beam, you can collect a PCollection into a pandas dataframe. It's useful when you just want to play with small test data sets locally on a single machine."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df = ib.collect(current_data)\n",
"df.describe()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's take a deeper look into the data with the visualization feature of Interactive Beam and come up with some tasks."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ib.show(current_data, visualize_data=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can find out that NY currently has the most positive COVID cases with above facets visualization because the data set is small (for demo).\n",
"\n",
"Now we can write a beam transform to try to get that same conclusion of which state has the highest positive number currently."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from functools import total_ordering\n",
"\n",
"@total_ordering\n",
"class UsCovidDataOrderByPositive:\n",
" def __init__(self, data):\n",
" self._data = data\n",
" \n",
" def __gt__(self, other):\n",
" return self._data.positive > other._data.positive\n",
"\n",
"\n",
"def maximum_positive(values):\n",
" return max(values) if values else None\n",
"\n",
"max_positive = (current_data \n",
" | 'Data OrderByPositive' >> beam.Map(lambda data: UsCovidDataOrderByPositive(data))\n",
" | 'Find Maximum Positive' >> beam.CombineGlobally(maximum_positive)\n",
" | 'Convert Back to Data' >> beam.Map(lambda orderable_data: orderable_data._data))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ib.show(max_positive)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can also try to come up with the total positive case number in the US."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"total_positive = (current_data\n",
" | 'Positive Per State' >> beam.Map(lambda data: data.positive)\n",
" | 'Total Positive' >> beam.CombineGlobally(sum))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ib.show(total_positive)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now let's look at some more complicated data: the historical data.\n",
"\n",
"It contains similar data to current for each day until current day."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"csv_file_historical = download_csv(csv_historical, 'historical')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"historical_data_from_files = p | 'Create PCollection for historical data from files' >> beam.io.ReadFromText(csv_file_historical, skip_header_lines=1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ib.show(historical_data_from_files)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"historical_data_headers = read_headers(csv_file_historical)\n",
"\n",
"HistoricalUsCovidData = namedtuple('HistoricalUsCovidData', historical_data_headers)\n",
"\n",
"historical_data = historical_data_from_files | 'Parse' >> beam.ParDo(UsCovidDataCsvReader(HistoricalUsCovidData))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ib.show(historical_data)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For demostration, let's just take a look at NY throughout the timeline."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class FilterByState(beam.DoFn):\n",
" def __init__(self, state):\n",
" self._state = state\n",
" \n",
" def process(self, element):\n",
" if element.state == self._state:\n",
" yield element\n",
"\n",
"ny_data = historical_data | 'Filter NY' >> beam.ParDo(FilterByState('NY'))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Then we do a visualization to see if there is anything worth looking for."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ib.show(ny_data, visualize_data=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"There happens to be a field named `positiveIncrease`. If not, we'll need to write some transforms to deduce the per day positive increment value.\n",
"\n",
"Now let's try to find out the date with the most `positiveIncrease` for NY."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"@total_ordering\n",
"class UsCovidDataOrderByPositiveIncrease:\n",
" def __init__(self, data):\n",
" self._data = data\n",
" \n",
" def __gt__(self, other):\n",
" self_positive_increase = self._data.positiveIncrease if self._data.positiveIncrease else 0\n",
" other_positive_increase = other._data.positiveIncrease if other._data.positiveIncrease else 0\n",
" return self_positive_increase > other_positive_increase\n",
"\n",
"\n",
"def maximum_positive_increase(values):\n",
" return max(values) if values else None\n",
"\n",
"worst_day = (ny_data\n",
" | 'Order By PositiveIncrease' >> beam.Map(lambda data: UsCovidDataOrderByPositiveIncrease(data))\n",
" | 'Maximum Positive Increase' >> beam.CombineGlobally(maximum_positive_increase)\n",
" | 'Convert Back to Data' >> beam.Map(lambda orderable_data: orderable_data._data)\n",
" | 'Extract Date' >> beam.Map(lambda data: data.date))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ib.show(worst_day)"
]
}
],
"metadata": {
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.7"
}
},
"nbformat": 4,
"nbformat_minor": 4
}