blob: 7122cc8b9ae5f60b78590cfe1555e4d455679a51 [file] [view]
---
type: languages
title: "Apache Beam Python Streaming Pipelines"
---
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# Python Streaming Pipelines
Python streaming pipeline execution became available (with some
[limitations](#unsupported-features)) starting with Beam SDK version 2.5.0.
## Why use streaming execution?
Beam creates an unbounded PCollection if your pipeline reads from a streaming or
continuously-updating data source (such as Cloud Pub/Sub). A runner must
process an unbounded PCollection using a streaming job that runs continuously,
as the entire collection is never available for processing at any one time.
[Size and boundedness](/documentation/programming-guide/#size-and-boundedness)
has more information about bounded and unbounded collections.
## Modifying a pipeline to use stream processing
To modify a batch pipeline to support streaming, you must make the following
code changes:
* Use an I/O connector that supports reading from an unbounded source.
* Use an I/O connector that supports writing to an unbounded source.
* Choose a [windowing strategy](/documentation/programming-guide/index.html#windowing).
The Beam SDK for Python includes two I/O connectors that support unbounded
PCollections: Google Cloud Pub/Sub (reading and writing) and Google BigQuery
(writing).
The following snippets show the necessary code changes to modify the batch
WordCount example to support streaming:
These batch WordCount snippets are from
[wordcount.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py).
This code uses the TextIO I/O connector to read from and write to a bounded
collection.
```
lines = p | 'read' >> ReadFromText(known_args.input)
...
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(six.text_type))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(count_ones))
...
output = counts | 'format' >> beam.Map(format_result)
# Write the output using a "Write" transform that has side effects.
output | 'write' >> WriteToText(known_args.output)
```
These streaming WordCount snippets are from
[streaming_wordcount.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/streaming_wordcount.py).
This code uses an I/O connector that reads from and writes to an unbounded
source (Cloud Pub/Sub) and specifies a fixed windowing strategy.
```
lines = p | beam.io.ReadFromPubSub(topic=known_args.input_topic)
...
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(six.text_type))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| beam.WindowInto(window.FixedWindows(15, 0))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(count_ones))
...
output = counts | 'format' >> beam.Map(format_result)
# Write to Pub/Sub
output | beam.io.WriteStringsToPubSub(known_args.output_topic)
```
## Running a streaming pipeline
To run the example streaming WordCount pipeline, you must have a Cloud Pub/Sub
input topic and output topic. To create, subscribe to, and pull from a topic for
testing purposes, you can use the commands in the [Cloud Pub/Sub quickstart](https://cloud.google.com/pubsub/docs/quickstart-cli).
The following simple bash script feeds lines of an input text file to your input
topic:
```
cat <YOUR_LOCAL_TEXT_FILE> | while read line; do gcloud pubsub topics publish <YOUR_INPUT_TOPIC_NAME> --message "$line"; done
```
Alternately, you can read from a publicly available Cloud Pub/Sub stream, such
as `projects/pubsub-public-data/topics/taxirides-realtime`. However, you must
create your own output topic to test writes.
The following commands run the
[streaming_wordcount.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/streaming_wordcount.py)
example streaming pipeline. Specify your Cloud Pub/Sub project and input topic
(`--input_topic`), output Cloud Pub/Sub project and topic (`--output_topic`).
{{< runner direct >}}
# DirectRunner requires the --streaming option
python -m apache_beam.examples.streaming_wordcount \
--input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
--output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
--streaming
{{< /runner >}}
{{< runner flink >}}
See /documentation/runners/flink/ for more information.
{{< /runner >}}
{{< runner spark >}}
See /documentation/runners/spark/ for more information.
{{< /runner >}}
{{< runner dataflow >}}
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
# DataflowRunner requires the --streaming option
python -m apache_beam.examples.streaming_wordcount \
--runner DataflowRunner \
--project YOUR_GCP_PROJECT \
--region YOUR_GCP_REGION \
--temp_location gs://YOUR_GCS_BUCKET/tmp/ \
--input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
--output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
--streaming
{{< /runner >}}
Check your runner's documentation for any additional runner-specific information
about executing streaming pipelines:
- [DirectRunner streaming execution](/documentation/runners/direct/#streaming-execution)
- [DataflowRunner streaming execution](/documentation/runners/dataflow/#streaming-execution)
- [Portable Flink runner](/documentation/runners/flink/)