blob: 165bbc9cd9ba4881b27db5857198641e69f7aba8 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""An example that processes streaming NYC Taxi data with SqlTransform.
This example reads from the PubSub NYC Taxi stream described in
https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, aggregates
the data in 15s windows using SqlTransform, and writes the output to
a user-defined PubSub topic.
Java 8 must be available to run this pipeline, and the
--experiments=use_runner_v2 flag must be passed when running on Dataflow.
Docker must also be available to run this pipeline locally.
"""
# pytype: skip-file
import json
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.sql import SqlTransform
def run(output_topic, pipeline_args):
pipeline_options = PipelineOptions(
pipeline_args, save_main_session=True, streaming=True)
with beam.Pipeline(options=pipeline_options) as pipeline:
_ = (
pipeline
| beam.io.ReadFromPubSub(
topic='projects/pubsub-public-data/topics/taxirides-realtime',
timestamp_attribute="ts").with_output_types(bytes)
| "Parse JSON payload" >> beam.Map(json.loads)
# Use beam.Row to create a schema-aware PCollection
| "Create beam Row" >> beam.Map(
lambda x: beam.Row(
ride_status=str(x['ride_status']),
passenger_count=int(x['passenger_count'])))
# SqlTransform will computes result within an existing window
| "15s fixed windows" >> beam.WindowInto(beam.window.FixedWindows(15))
# Aggregate drop offs and pick ups that occur within each 15s window
| SqlTransform(
"""
SELECT
ride_status,
COUNT(*) AS num_rides,
SUM(passenger_count) AS total_passengers
FROM PCOLLECTION
WHERE NOT ride_status = 'enroute'
GROUP BY ride_status""")
# SqlTransform yields python objects with attributes corresponding to
# the outputs of the query.
# Collect those attributes, as well as window information, into a dict
| "Assemble Dictionary" >> beam.Map(
lambda row,
window=beam.DoFn.WindowParam: {
"ride_status": row.ride_status,
"num_rides": row.num_rides,
"total_passengers": row.total_passengers,
"window_start": window.start.to_rfc3339(),
"window_end": window.end.to_rfc3339()
})
| "Convert to JSON" >> beam.Map(json.dumps)
| "UTF-8 encode" >> beam.Map(lambda s: s.encode("utf-8"))
| beam.io.WriteToPubSub(topic=output_topic))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
'--output_topic',
dest='output_topic',
required=True,
help=(
'Cloud PubSub topic to write to (e.g. '
'projects/my-project/topics/my-topic), must be created prior to '
'running the pipeline.'))
known_args, pipeline_args = parser.parse_known_args()
run(known_args.output_topic, pipeline_args)