| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """An example workflow that demonstrates filters and other features. |
| |
| - Reading and writing data from BigQuery. |
| - Manipulating BigQuery rows (as Python dicts) in memory. |
| - Global aggregates. |
| - Filtering PCollections using both user-specified parameters |
| as well as global aggregates computed during pipeline execution. |
| """ |
| |
| from __future__ import absolute_import |
| |
| import argparse |
| import logging |
| |
| import apache_beam as beam |
| from apache_beam.pvalue import AsSingleton |
| |
| |
| def filter_cold_days(input_data, month_filter): |
| """Workflow computing rows in a specific month with low temperatures. |
| |
| Args: |
| input_data: a PCollection of dictionaries representing table rows. Each |
| dictionary must have the keys ['year', 'month', 'day', and 'mean_temp']. |
| month_filter: an int representing the month for which colder-than-average |
| days should be returned. |
| |
| Returns: |
| A PCollection of dictionaries with the same keys described above. Each |
| row represents a day in the specified month where temperatures were |
| colder than the global mean temperature in the entire dataset. |
| """ |
| |
| # Project to only the desired fields from a complete input row. |
| # E.g., SELECT f1, f2, f3, ... FROM InputTable. |
| projection_fields = ['year', 'month', 'day', 'mean_temp'] |
| fields_of_interest = ( |
| input_data |
| | 'Projected' >> beam.Map( |
| lambda row: {f: row[f] for f in projection_fields})) |
| |
| # Compute the global mean temperature. |
| global_mean = AsSingleton( |
| fields_of_interest |
| | 'ExtractMean' >> beam.Map(lambda row: row['mean_temp']) |
| | 'GlobalMean' >> beam.combiners.Mean.Globally()) |
| |
| # Filter to the rows representing days in the month of interest |
| # in which the mean daily temperature is below the global mean. |
| return ( |
| fields_of_interest |
| | 'DesiredMonth' >> beam.Filter(lambda row: row['month'] == month_filter) |
| | 'BelowMean' >> beam.Filter( |
| lambda row, mean: row['mean_temp'] < mean, global_mean)) |
| |
| |
| def run(argv=None): |
| """Constructs and runs the example filtering pipeline.""" |
| |
| parser = argparse.ArgumentParser() |
| parser.add_argument('--input', |
| help='BigQuery table to read from.', |
| default='clouddataflow-readonly:samples.weather_stations') |
| parser.add_argument('--output', |
| required=True, |
| help='BigQuery table to write to.') |
| parser.add_argument('--month_filter', |
| default=7, |
| help='Numeric value of month to filter on.') |
| known_args, pipeline_args = parser.parse_known_args(argv) |
| |
| with beam.Pipeline(argv=pipeline_args) as p: |
| |
| input_data = p | beam.io.Read(beam.io.BigQuerySource(known_args.input)) |
| |
| # pylint: disable=expression-not-assigned |
| (filter_cold_days(input_data, known_args.month_filter) |
| | 'SaveToBQ' >> beam.io.Write(beam.io.BigQuerySink( |
| known_args.output, |
| schema='year:INTEGER,month:INTEGER,day:INTEGER,mean_temp:FLOAT', |
| create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, |
| write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))) |
| |
| |
| if __name__ == '__main__': |
| logging.getLogger().setLevel(logging.INFO) |
| run() |