Prompt: Write the Python code to read data from BigQuery using Apache Beam.

Response: Your Apache Beam pipeline can read data from BigQuery using the ReadFromBigQuery transform. The following Python code reads data from a BigQuery table and logs it to the console. The table name is provided as the command-line argument table.

import logging

import apache_beam as beam
from apache_beam.io import ReadFromBigQuery
from apache_beam.options.pipeline_options import PipelineOptions

class BigQueryReadOptions(PipelineOptions):
"""
Configure pipeline options for BigQuery read transform.
"""
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          "--table",
          required=True,
          help="BigQuery table to read from.")

def read_table():
    """Read from BigQuery table function."""

    #parse pipeline options
    #streaming=True is required for a streaming pipeline
    options = BigQueryReadOptions(streaming=True)

    with beam.Pipeline(options=options) as p:
        #this pipeline reads from a BigQuery table and logs the data to the console
        (p | "Read BigQuery table" >> ReadFromBigQuery(table=options.table)
         | "Format data" >> Map(lambda row: f"Received row:\n{row}\n")
         | Map(logging.info))

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    read_table()

The ReadFromBigQuery transform also supports reading from a BigQuery query. The following Python code reads data from a BigQuery table using a SQL query and logs it to the console. The query is provided as the command-line argument query.

with beam.Pipeline(options=options) as p:
    p | "Read BigQuery table" >> ReadFromBigQuery(query='SELECT * FROM table')
      | "Format data" >> Map(lambda row: f"Received row:\n{row}\n")
      | Map(logging.info)

For more information, see the BigQuery I/O connector documentation.

For samples that show common pipeline configurations, see Pipeline option patterns.