Prompt: Write the Python code to read data from Cloud Spanner using Apache Beam.
Response: Your Apache Beam pipeline can read data from Cloud Spanner using the Apache Beam SpannerIO connector. The SpannerIO connector lets you read data from and write data to Cloud Spanner. Cloud Spanner is supported in both batch pipelines and streaming pipelines.
import logging import apache_beam as beam from apache_beam import coders from apache_beam import Map from apache_beam.io.gcp.spanner import ReadFromSpanner from apache_beam.options.pipeline_options import PipelineOptions from typing import NamedTuple class ExampleRow(NamedTuple): id: int name: str class SpannerOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--project_id', default='your-project-id', help='Google Cloud project ID' ) parser.add_argument( '--instance_id', default='your-instance-id', help='Google Cloud Spanner instance ID' ) parser.add_argument( '--database_id', default='your-database-id', help='Google Cloud Spanner database ID' ) options = SpannerOptions() coders.registry.register_coder(ExampleRow, coders.RowCoder) with beam.Pipeline(options=options) as p: output = (p | "Read from table" >> ReadFromSpanner( project_id=options.project_id, instance_id=options.instance_id, database_id=options.database_id, row_type=ExampleRow, sql="SELECT * FROM example_row" ) | "Log Data" >> Map(logging.info))
The ReadFromSpanner transform is a built-in Apache Beam transform that reads data from a Cloud Spanner table. The ReadFromSpanner transform returns a PCollection of NamedTuple objects. The NamedTuple object is a Python class that represents a row in a Cloud Spanner table.
Registering a coder for NamedTuple is required to use NamedTuple as a row type:
coders.registry.register_coder(ExampleRow, coders.RowCoder)
For more information about how to register a coder for a custom type, see Data encoding and type safety.
The SpannerOptions class defines the command-line arguments project_id, instance_id, and database_id, which are used to configure the ReadFromSpanner transform. These arguments are parsed from the command line using pipeline options.