blob: 23d25649dad5eab805dd099801d0ca95f327e8e4 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""An example that reads New York City for hire vehicle trips.
An example that reads New York City for hire vehicle trips data from Google
Cloud Storage and calculates various statistics including the passenger's
price per trip, price per mile, and price per minute.
Data originally downloaded as parquet files from the New York City
government
`website <https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page>`_
before being converted to avro format. Here is the
`documentation
<https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_hvfhs.pdf>`_
for the data format and schema.
To execute this pipeline locally using the DirectRunner, specify an
output prefix on GCS:::
--output gs://YOUR_OUTPUT_PREFIX
To execute this pipeline using the Google Cloud Dataflow service, specify
pipeline configuration in addition to the above:::
--job_name NAME_FOR_YOUR_JOB
--project YOUR_PROJECT_ID
--region GCE_REGION
--staging_location gs://YOUR_STAGING_DIRECTORY
--temp_location gs://YOUR_TEMPORARY_DIRECTORY
--runner DataflowRunner
The default input is
``gs://apache-beam-samples/nyc_trip/avro/fhvhv_tripdata_2023-02.avro`` and
takes about 15 minutes with 6 workers running on Dataflow. The default input
can be overridden with --input argument. More data can be accessed at
``gs://apache-beam-samples/nyc_trip/avro/*``.
Additionally, the original parquet files can be found at
``gs://apache-beam-samples/nyc_trip/parquet/*``.
"""
# pytype: skip-file
import argparse
import datetime
import logging
import pytz
import apache_beam as beam
from apache_beam.io import ReadFromAvro
from apache_beam.io import WriteToAvro
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
SCHEMA = {
'fields': [
{
'name': 'service', 'type': 'string'
},
{
'name': 'day', 'type': 'string'
},
{
'name': 'total_price', 'type': 'double'
},
{
'name': 'total_driver_pay', 'type': 'double'
},
{
'name': 'total_trip_miles', 'type': 'double'
},
{
'name': 'total_trip_minutes', 'type': 'double'
},
{
'name': 'total_number_of_trips', 'type': 'long'
},
{
'name': 'price_per_trip', 'type': 'double'
},
{
'name': 'price_per_mile', 'type': 'double'
},
{
'name': 'price_per_minute', 'type': 'double'
},
{
'name': 'driver_pay_per_trip', 'type': 'double'
},
{
'name': 'driver_pay_per_mile', 'type': 'double'
},
{
'name': 'driver_pay_per_minute', 'type': 'double'
},
{
'name': 'miles_per_hour', 'type': 'double'
},
],
'name': 'nyc_trip_prices',
'type': 'record',
}
class CreateKeyWithServiceAndDay(beam.DoFn):
"""Creates a key, value group.
The key is the combination of the HVFHS Code converted to the ride-sharing
company name and the day of the week of the ride. The value is the original
record dictionary object.
"""
def process(self, record: dict):
options = {
'HV0002': 'Juno', 'HV0003': 'Uber', 'HV0004': 'Via', 'HV0005': 'Lyft'
}
service = options.get(record['hvfhs_license_num'])
if service:
timestamp = None
for k in ('request_datetime',
'on_scene_datetime',
'pickup_datetime',
'dropoff_datetime'):
timestamp = timestamp or record[k]
if timestamp:
break
day_of_the_week = datetime.datetime.fromtimestamp(
timestamp / 1000.0,
tz=pytz.timezone('America/New_York')).strftime('%a')
yield (service, day_of_the_week), record
class CalculatePricePerAttribute(beam.CombineFn):
"""Calculates the price for attribute.
Calculates the total driver pay, price, miles, minutes, and trips per for
hire vehicle service. And calculates the price per mile, minute, and trip
for both the driver and passenger.
"""
def create_accumulator(self):
total_price = 0.0
total_driver_pay = 0.0
total_trip_miles = 0.0
total_trip_time = 0.0
total_number_of_trips = 0
accumulator = (
total_price,
total_driver_pay,
total_trip_miles,
total_trip_time,
total_number_of_trips,
)
return accumulator
def add_input(self, accumulator, record):
(
total_price,
total_driver_pay,
total_trip_miles,
total_trip_time,
total_number_of_trips,
) = accumulator
return (
total_price + sum(
record[name] for name in (
'base_passenger_fare',
'tolls',
'bcf',
'sales_tax',
'congestion_surcharge',
'airport_fee',
'tips') if record[name] is not None),
total_driver_pay + record['driver_pay'] + record['tips'],
total_trip_miles + record['trip_miles'],
total_trip_time + record['trip_time'],
total_number_of_trips + 1,
)
def merge_accumulators(self, accumulators):
# calculates the sum for each item in the accumulator
return tuple(sum(item) for item in zip(*accumulators))
def extract_output(self, accumulator):
(
total_price,
total_driver_pay,
total_trip_miles,
total_trip_time,
total_number_of_trips,
) = accumulator
total_trip_minutes = total_trip_time / 60
return {
'total_driver_pay': total_driver_pay,
'total_price': total_price,
'total_trip_miles': total_trip_miles,
'total_trip_minutes': total_trip_minutes,
'total_number_of_trips': total_number_of_trips,
'price_per_trip': total_price / total_number_of_trips,
'price_per_mile': total_price / total_trip_miles,
'price_per_minute': total_price / total_trip_minutes,
'driver_pay_per_trip': total_driver_pay / total_number_of_trips,
'driver_pay_per_mile': total_driver_pay / total_trip_miles,
'driver_pay_per_minute': total_driver_pay / total_trip_minutes,
'miles_per_hour': total_trip_miles / (total_trip_minutes / 60),
}
def flatten_group(element):
"""Flattens the key, value pair to a single record dictionary."""
key, record = element
service, day_of_the_week = key
record['service'] = service
record['day'] = day_of_the_week
return record
def run(argv=None):
"""Runs the New York City trips pipeline.
Args:
argv: Pipeline options as a list of arguments.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default=
'gs://apache-beam-samples/nyc_trip/avro/fhvhv_tripdata_2023-02.avro',
help='Input file of NYC FHV data to process. Larger dataset can be found '
'here: gs://apache-beam-samples/nyc_trip/avro/*',
)
parser.add_argument(
'--output',
dest='output',
help='Output file to write results to.',
required=True,
)
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
( # pylint: disable=expression-not-assigned
p
| ReadFromAvro(known_args.input)
# filter records that have keys that are not None
# and at least one timestamp that is not None
| beam.Filter(
lambda record: all(
record[k] is not None for k in (
'hvfhs_license_num',
'trip_miles',
'trip_time',
'base_passenger_fare',
'tips',
'driver_pay')) and any(
record[k] is not None for k in (
'request_datetime',
'on_scene_datetime',
'pickup_datetime',
'dropoff_datetime')))
| beam.ParDo(CreateKeyWithServiceAndDay())
| beam.CombinePerKey(CalculatePricePerAttribute())
| beam.Map(flatten_group)
| WriteToAvro(known_args.output, SCHEMA, file_name_suffix='.avro'))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()