layout: section title: “Apache Beam Python Streaming Pipelines” section_menu: section-menu/sdks.html permalink: /documentation/sdks/python-streaming/

Python Streaming Pipelines

Python streaming pipeline execution became available (with some limitations) starting with Beam SDK version 2.5.0.

Why use streaming execution?

Beam creates an unbounded PCollection if your pipeline reads from a streaming or continously-updating data source (such as Cloud Pub/Sub). A runner must process an unbounded PCollection using a streaming job that runs continuously, as the entire collection is never available for processing at any one time. [Size and boundedness]({{ site.baseurl }}/documentation/programming-guide/#size-and-boundedness) has more information about bounded and unbounded collections.

Modifying a pipeline to use stream processing

To modify a batch pipeline to support streaming, you must make the following code changes:

  • Use an I/O connector that supports reading from an unbounded source.
  • Use an I/O connector that supports writing to an unbounded source.
  • Choose a [windowing strategy]({{ site.baseurl }}/documentation/programming-guide/index.html#windowing).

The Beam SDK for Python includes two I/O connectors that support unbounded PCollections: Google Cloud Pub/Sub (reading and writing) and Google BigQuery (writing).

The following snippets show the necessary code changes to modify the batch WordCount example to support streaming:

These batch WordCount snippets are from wordcount.py. This code uses the TextIO I/O connector to read from and write to a bounded collection.

  lines = p | 'read' >> ReadFromText(known_args.input)
  ...

  counts = (lines
            | 'split' >> (beam.ParDo(WordExtractingDoFn())
                          .with_output_types(six.text_type))
            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
            | 'group' >> beam.GroupByKey()
            | 'count' >> beam.Map(count_ones))
  ...

  output = counts | 'format' >> beam.Map(format_result)

  # Write the output using a "Write" transform that has side effects.
  output | 'write' >> WriteToText(known_args.output)

These streaming WordCount snippets are from streaming_wordcount.py. This code uses an I/O connector that reads from and writes to an unbounded source (Cloud Pub/Sub) and specifies a fixed windowing strategy.

  lines = p | beam.io.ReadFromPubSub(topic=known_args.input_topic)
  ...

  counts = (lines
            | 'split' >> (beam.ParDo(WordExtractingDoFn())
                          .with_output_types(six.text_type))
            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
            | beam.WindowInto(window.FixedWindows(15, 0))
            | 'group' >> beam.GroupByKey()
            | 'count' >> beam.Map(count_ones))

  ...

  output = counts | 'format' >> beam.Map(format_result)

  # Write to Pub/Sub
  output | beam.io.WriteStringsToPubSub(known_args.output_topic)

Running a streaming pipeline

To run the example streaming WordCount pipeline, you must have a Cloud Pub/Sub input topic and output topic. To create, subscribe to, and pull from a topic for testing purposes, you can use the commands in the Cloud Pub/Sub quickstart.

The following simple bash script feeds lines of an input text file to your input topic:

cat <YOUR_LOCAL_TEXT_FILE> | while read line; do gcloud pubsub topics publish <YOUR_INPUT_TOPIC_NAME> --message "$line"; done

Alternately, you can read from a publicly available Cloud Pub/Sub stream, such as projects/pubsub-public-data/topics/taxirides-realtime. However, you must create your own output topic to test writes.

The following commands run the streaming_wordcount.py example streaming pipeline. Specify your Cloud Pub/Sub project and input topic (--input_topic), output Cloud Pub/Sub project and topic (--output_topic).

{:.runner-direct}

# DirectRunner requires the --streaming option
python -m apache_beam.examples.streaming_wordcount \
  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
  --streaming

{:.runner-apex}

This runner is not yet available for the Python SDK.

{:.runner-flink-local}

This runner is not yet available for the Python SDK.

{:.runner-flink-cluster}

This runner is not yet available for the Python SDK.

{:.runner-spark}

This runner is not yet available for the Python SDK.

{:.runner-dataflow}

# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]

# DataflowRunner requires the --streaming option
python -m apache_beam.examples.streaming_wordcount \
  --runner DataflowRunner \
  --project YOUR_GCP_PROJECT \
  --temp_location gs://YOUR_GCS_BUCKET/tmp/ \
  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
  --streaming

Check your runner's documentation for any additional runner-specific information about executing streaming pipelines:

  • [DirectRunner streaming execution]({{ site.baseurl }}/documentation/runners/direct/#streaming-execution)
  • [DataflowRunner streaming execution]({{ site.baseurl }}/documentation/runners/dataflow/#streaming-execution)

Unsupported features

Python streaming execution does not currently support the following features.

General Beam features

These unsupported Beam features apply to all runners.

  • State and Timers APIs
  • Custom source API
  • Splittable DoFn API
  • Handling of late data
  • User-defined custom merging WindowFn (with fnapi)

DataflowRunner specific features

Additionally, DataflowRunner does not currently support the following Cloud Dataflow specific features with Python streaming execution.

  • Cloud Dataflow Templates