Prompt: Write the Python code to read data from Cloud Spanner using Apache Beam.

Response: Your Apache Beam pipeline can read data from Cloud Spanner using the Apache Beam SpannerIO connector. The SpannerIO connector lets you read data from and write data to Cloud Spanner. Cloud Spanner is supported in both batch pipelines and streaming pipelines.

import logging
import apache_beam as beam

from apache_beam import coders
from apache_beam import Map
from apache_beam.io.gcp.spanner import ReadFromSpanner
from apache_beam.options.pipeline_options import PipelineOptions
from typing import NamedTuple

class ExampleRow(NamedTuple):
    id: int
    name: str

class SpannerOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--project_id',
            default='your-project-id',
            help='Google Cloud project ID'
        )
        parser.add_argument(
            '--instance_id',
            default='your-instance-id',
            help='Google Cloud Spanner instance ID'
        )
        parser.add_argument(
            '--database_id',
            default='your-database-id',
            help='Google Cloud Spanner database ID'
        )

options = SpannerOptions()
coders.registry.register_coder(ExampleRow, coders.RowCoder)

with beam.Pipeline(options=options) as p:

    output = (p | "Read from table" >> ReadFromSpanner(
        project_id=options.project_id,
        instance_id=options.instance_id,
        database_id=options.database_id,
        row_type=ExampleRow,
        sql="SELECT * FROM example_row"
        )
        | "Log Data" >> Map(logging.info))

The ReadFromSpanner transform is a built-in Apache Beam transform that reads data from a Cloud Spanner table. The ReadFromSpanner transform returns a PCollection of NamedTuple objects. The NamedTuple object is a Python class that represents a row in a Cloud Spanner table.

Registering a coder for NamedTuple is required to use NamedTuple as a row type:

 coders.registry.register_coder(ExampleRow, coders.RowCoder)

For more information about how to register a coder for a custom type, see Data encoding and type safety.

The SpannerOptions class defines the command-line arguments project_id, instance_id, and database_id, which are used to configure the ReadFromSpanner transform. These arguments are parsed from the command line using pipeline options.