Prompt: Write the Python code to read messages from a Pub/Sub subscription.
Response: Your Apache Beam pipeline can read messages from a Pub/Sub subscription or topic using the ReadFromPubSub transform. Pub/Sub is only supported in streaming pipelines.
The following Python code reads messages from a Pub/Sub subscription and logs the data to the console. The subscription is provided as a command line argument:
import logging import apache_beam as beam from apache_beam import Map from apache_beam.io import ReadFromPubSub from apache_beam.options.pipeline_options import PipelineOptions class PubSubReadOptions(PipelineOptions): """Configure pipeline options for Pub/Sub read transform.""" @classmethod def _add_argparse_args(cls, parser): parser.add_argument( "--subscription", required=True, help="Pub/Sub subscription to read from.") def read_subscription(): """Read from Pub/Sub subscription function.""" # Parse the pipeline options. # For streaming pipelines, you need to include streaming=True options = PubSubReadOptions(streaming=True) with beam.Pipeline(options=options) as p: # This pipeline reads from a Pub/Sub subscription and logs the messages to the console. (p | "Read Pub/Sub subscription" >> ReadFromPubSub(subscription=options.subscription) | "Format message" >> Map(lambda message: f"Received message:\n{message}\n") | Map(logging.info)) if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) read_subscription()
Apache Beam also supports reading messages directly from a topic. A temporary subscription will be created automatically.
The messages could be returned as a byte string or as PubsubMessage objects. This behavior is controlled by the with_attributes parameter.
For more information, see the Pub/Sub I/O transform documentation.
For a common pattern for configuring pipeline options, see Pipeline option patterns.