blob: 1fbf763e5005e0e772b5dba19680b0608cb765e1 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""An example workflow that demonstrates filters and other features.
- Reading and writing data from BigQuery.
- Manipulating BigQuery rows (as Python dicts) in memory.
- Global aggregates.
- Filtering PCollections using both user-specified parameters
as well as global aggregates computed during pipeline execution.
"""
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.pvalue import AsSingleton
def filter_cold_days(input_data, month_filter):
"""Workflow computing rows in a specific month with low temperatures.
Args:
input_data: a PCollection of dictionaries representing table rows. Each
dictionary must have the keys ['year', 'month', 'day', and 'mean_temp'].
month_filter: an int representing the month for which colder-than-average
days should be returned.
Returns:
A PCollection of dictionaries with the same keys described above. Each
row represents a day in the specified month where temperatures were
colder than the global mean temperature in the entire dataset.
"""
# Project to only the desired fields from a complete input row.
# E.g., SELECT f1, f2, f3, ... FROM InputTable.
projection_fields = ['year', 'month', 'day', 'mean_temp']
fields_of_interest = (
input_data
| 'Projected' >> beam.Map(
lambda row: {f: row[f] for f in projection_fields}))
# Compute the global mean temperature.
global_mean = AsSingleton(
fields_of_interest
| 'ExtractMean' >> beam.Map(lambda row: row['mean_temp'])
| 'GlobalMean' >> beam.combiners.Mean.Globally())
# Filter to the rows representing days in the month of interest
# in which the mean daily temperature is below the global mean.
return (
fields_of_interest
| 'DesiredMonth' >> beam.Filter(lambda row: row['month'] == month_filter)
| 'BelowMean' >> beam.Filter(
lambda row, mean: row['mean_temp'] < mean, global_mean))
def run(argv=None):
"""Constructs and runs the example filtering pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument('--input',
help='BigQuery table to read from.',
default='clouddataflow-readonly:samples.weather_stations')
parser.add_argument('--output',
required=True,
help='BigQuery table to write to.')
parser.add_argument('--month_filter',
default=7,
help='Numeric value of month to filter on.')
known_args, pipeline_args = parser.parse_known_args(argv)
with beam.Pipeline(argv=pipeline_args) as p:
input_data = p | beam.io.Read(beam.io.BigQuerySource(known_args.input))
# pylint: disable=expression-not-assigned
(filter_cold_days(input_data, known_args.month_filter)
| 'SaveToBQ' >> beam.io.Write(beam.io.BigQuerySink(
known_args.output,
schema='year:INTEGER,month:INTEGER,day:INTEGER,mean_temp:FLOAT',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()