blob: c2c467b39eaeedb7fe2d084af8894b51313b1387 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A pipeline using dataframes to compute typical flight delay times."""
# pytype: skip-file
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.dataframe.convert import to_dataframe
from apache_beam.options.pipeline_options import PipelineOptions
def get_mean_delay_at_top_airports(airline_df):
arr = airline_df.rename(columns={
'arrival_airport': 'airport'
}).airport.value_counts()
dep = airline_df.rename(columns={
'departure_airport': 'airport'
}).airport.value_counts()
total = arr + dep
# Note we keep all to include duplicates - this ensures the result is
# deterministic.
# NaNs can be included in the output in pandas 1.4.0 and above, so we
# explicitly drop them.
top_airports = total.nlargest(10, keep='all').dropna()
at_top_airports = airline_df['arrival_airport'].isin(
top_airports.index.values)
return airline_df[at_top_airports].mean()
def input_date(date):
import datetime
parsed = datetime.datetime.strptime(date, '%Y-%m-%d')
if (parsed > datetime.datetime(2012, 12, 31) or
parsed < datetime.datetime(2002, 1, 1)):
raise ValueError("There's only data from 2002-01-01 to 2012-12-31")
return date
def run_flight_delay_pipeline(
pipeline, start_date=None, end_date=None, output=None):
query = f"""
SELECT
FlightDate AS date,
IATA_CODE_Reporting_Airline AS airline,
Origin AS departure_airport,
Dest AS arrival_airport,
DepDelay AS departure_delay,
ArrDelay AS arrival_delay
FROM `apache-beam-testing.airline_ontime_data.flights`
WHERE
FlightDate >= '{start_date}' AND FlightDate <= '{end_date}' AND
DepDelay IS NOT NULL AND ArrDelay IS NOT NULL
"""
# Import this here to avoid pickling the main session.
import time
from apache_beam import window
def to_unixtime(s):
return time.mktime(s.timetuple())
# The pipeline will be run on exiting the with block.
with pipeline as p:
tbl = (
p
| 'read table' >> beam.io.ReadFromBigQuery(
query=query, use_standard_sql=True)
| 'assign timestamp' >>
beam.Map(lambda x: window.TimestampedValue(x, to_unixtime(x['date'])))
# Use beam.Select to make sure data has a schema
# The casts in lambdas ensure data types are properly inferred
| 'set schema' >> beam.Select(
date=lambda x: str(x['date']),
airline=lambda x: str(x['airline']),
departure_airport=lambda x: str(x['departure_airport']),
arrival_airport=lambda x: str(x['arrival_airport']),
departure_delay=lambda x: float(x['departure_delay']),
arrival_delay=lambda x: float(x['arrival_delay'])))
daily = tbl | 'daily windows' >> beam.WindowInto(
beam.window.FixedWindows(60 * 60 * 24))
# group the flights data by carrier
df = to_dataframe(daily)
result = df.groupby('airline').apply(get_mean_delay_at_top_airports)
result.to_csv(output)
def run(argv=None):
"""Main entry point; defines and runs the flight delay pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--start_date',
dest='start_date',
type=input_date,
default='2012-12-22',
help='YYYY-MM-DD lower bound (inclusive) for input dataset.')
parser.add_argument(
'--end_date',
dest='end_date',
type=input_date,
default='2012-12-26',
help='YYYY-MM-DD upper bound (inclusive) for input dataset.')
parser.add_argument(
'--output',
dest='output',
required=True,
help='Location to write the output.')
known_args, pipeline_args = parser.parse_known_args(argv)
run_flight_delay_pipeline(
beam.Pipeline(options=PipelineOptions(pipeline_args)),
start_date=known_args.start_date,
end_date=known_args.end_date,
output=known_args.output)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()