blob: cba84c01a9a96b96e715a7213f7907fc04e35b1a [file] [log] [blame] [view]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Example DataFrame API Pipelines
This module contains example pipelines that use the [Beam DataFrame
API](https://beam.apache.org/documentation/dsls/dataframes/overview/).
## Pre-requisites
You must have `apache-beam>=2.30.0` installed in order to run these pipelines,
because the `apache_beam.examples.dataframe` module was added in that release.
Using the DataFrame API also requires a compatible pandas version to be
installed, see the
[documentation](https://beam.apache.org/documentation/dsls/dataframes/overview/#pre-requisites)
for details.
## Wordcount Pipeline
Wordcount is the "Hello World" of data analytic systems, so of course we
had to implement it for the Beam DataFrame API! See [`wordcount.py`](./wordcount.py) for the
implementation. Note it demonstrates how to integrate the DataFrame API with
a larger Beam pipeline by using [Beam
Schemas](https://beam.apache.org/documentation/programming-guide/#what-is-a-schema)
in conjunction with
[to_dataframe](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_dataframe)
and
[to_pcollection](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_pcollection).
### Running the pipeline
To run the pipeline locally:
```sh
python -m apache_beam.examples.dataframe.wordcount \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output counts
```
This will produce files like `counts-XXXXX-of-YYYYY` with contents like:
```
KING: 243
LEAR: 236
DRAMATIS: 1
PERSONAE: 1
king: 65
of: 447
Britain: 2
OF: 15
FRANCE: 10
DUKE: 3
...
```
## Taxi Ride Example Pipelines
[`taxiride.py`](./taxiride.py) contains implementations for two DataFrame pipelines that
process the well-known [NYC Taxi
dataset](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page). These
pipelines don't use any Beam primitives. Instead they build end-to-end pipelines
using the DataFrame API, by leveraging [DataFrame
IOs](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.io.html).
The module defines two pipelines. The `location_id_agg` pipeline does a grouped
aggregation on the drop-off location ID. The `borough_enrich` pipeline extends
this example by joining the zone lookup table to find the borough where each
drop off occurred, and aggregate per borough.
### Data
Some snapshots of NYC taxi data have been staged in
`gs://apache-beam-samples` for use with these example pipelines:
- `gs://apache-beam-samples/nyc_taxi/2017/yellow_tripdata_2017-*.csv`: CSV files
containing taxi ride data for each month of 2017 (similar directories exist
for 2018 and 2019).
- `gs://apache-beam-samples/nyc_taxi/misc/sample.csv`: A sample of 1 million
records from the beginning of 2019. At ~85 MiB this is a manageable size for
processing locally.
- `gs://apache-beam-samples/nyc_taxi/misc/taxi+_zone_lookup.csv`: Lookup table
with information about Zone IDs. Used by the `borough_enrich` pipeline.
### Running `location_id_agg`
To run the aggregation pipeline locally, use the following command:
```sh
python -m apache_beam.examples.dataframe.taxiride \
--pipeline location_id_agg \
--input gs://apache-beam-samples/nyc_taxi/misc/sample.csv \
--output aggregation.csv
```
This will write the output to files like `aggregation.csv-XXXXX-of-YYYYY` with
contents like:
```
DOLocationID,passenger_count
1,3852
3,130
4,7725
5,24
6,37
7,7429
8,24
9,180
10,938
...
```
### Running `borough_enrich`
To run the enrich pipeline locally, use the command:
```sh
python -m apache_beam.examples.dataframe.taxiride \
--pipeline borough_enrich \
--input gs://apache-beam-samples/nyc_taxi/misc/sample.csv \
--output enrich.csv
```
This will write the output to files like `enrich.csv-XXXXX-of-YYYYY` with
contents like:
```
Borough,passenger_count
Bronx,13645
Brooklyn,70654
EWR,3852
Manhattan,1417124
Queens,81138
Staten Island,531
Unknown,28527
```
## Flight Delay pipeline (added in 2.31.0)
[`flight_delays.py`](./flight_delays.py) contains an implementation of
a pipeline that processes the flight ontime data from
`bigquery-samples.airline_ontime_data.flights`. It uses a conventional Beam
pipeline to read from BigQuery, apply a 24-hour rolling window, and define a
Beam schema for the data. Then it converts to DataFrames in order to perform
a complex aggregation using `GroupBy.apply`, and write the result out with
`to_csv`. Note that the DataFrame computation respects the 24-hour window
applied above, and results are partitioned into separate files per day.
### Running the pipeline
To run the pipeline locally:
```sh
python -m apache_beam.examples.dataframe.flight_delays \
--start_date 2012-12-24 \
--end_date 2012-12-25 \
--output gs://<bucket>/<dir>/delays.csv \
--project <gcp-project> \
--temp_location gs://<bucket>/<dir>
```
Note a GCP `project` and `temp_location` are required for reading from BigQuery.
This will produce files like
`gs://<bucket>/<dir>/delays.csv-2012-12-24T00:00:00-2012-12-25T00:00:00-XXXXX-of-YYYYY`
with contents tracking average delays per airline on that day, for example:
```
airline,departure_delay,arrival_delay
EV,10.01901901901902,4.431431431431432
HA,-1.0829015544041452,0.010362694300518135
UA,19.142555438225976,11.07180570221753
VX,62.755102040816325,62.61224489795919
WN,12.074298711144806,6.717968157695224
...
```