Prompt: Write the Python code to read messages from a Pub/Sub subscription.

Response: Your Apache Beam pipeline can read messages from a Pub/Sub subscription or topic using the ReadFromPubSub transform. Pub/Sub is only supported in streaming pipelines.

The following Python code reads messages from a Pub/Sub subscription and logs the data to the console. The subscription is provided as a command line argument:

import logging

import apache_beam as beam
from apache_beam import Map
from apache_beam.io import ReadFromPubSub
from apache_beam.options.pipeline_options import PipelineOptions

class PubSubReadOptions(PipelineOptions):
"""Configure pipeline options for Pub/Sub read transform."""

    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          "--subscription",
          required=True,
          help="Pub/Sub subscription to read from.")

def read_subscription():
  """Read from Pub/Sub subscription function."""

  # Parse the pipeline options.
  # For streaming pipelines, you need to include streaming=True
  options = PubSubReadOptions(streaming=True)

  with beam.Pipeline(options=options) as p:
    # This pipeline reads from a Pub/Sub subscription and logs the messages to the console.
    (p | "Read Pub/Sub subscription" >> ReadFromPubSub(subscription=options.subscription)
       | "Format message" >> Map(lambda message: f"Received message:\n{message}\n")
       | Map(logging.info))

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  read_subscription()

Apache Beam also supports reading messages directly from a topic. A temporary subscription will be created automatically.

The messages could be returned as a byte string or as PubsubMessage objects. This behavior is controlled by the with_attributes parameter.

For more information, see the Pub/Sub I/O transform documentation.

For a common pattern for configuring pipeline options, see Pipeline option patterns.