Prompt: Write the Python code to read data from Apache Kafka using Apache Beam.

Response: You can read data from Apache Kafka using the ReadFromKafka transform. The following Python code reads a topic topic from a Kafka server bootstrap_servers. The values for topic and bootstrap_servers are provided as command-line arguments. The data is logged to the console.

import logging
import apache_beam as beam

from apache_beam import Map
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions

class KafkaOptions(PipelineOptions):
"""
Configure pipeline options for Kafka read transform.
"""
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--bootstrap_servers',
            default="localhost:9092",
            help='Apache Kafka bootstrap servers'
        )
        parser.add_argument(
            '--topic',
            default="your-topic",
            help='Apache Kafka topic'
        )

#parse pipeline options
options = KafkaOptions()

with beam.Pipeline(options=options) as p:
    #this pipeline reads from a Kafka topic and logs the data to the console
    output = (p | "Read from Kafka" >> ReadFromKafka(
        consumer_config={'bootstrap.servers': options.bootstrap_servers},
        topics=[options.topic],
        with_metadata=False
        )
        | "Log Data" >> Map(logging.info))

For more information about how to use the KafkaIO connector with the Python SDK for Apache Beam, see the KafkaIO connector documentation.

For samples that show common pipeline configurations to parse pipeline options from command-line arguments, see Pipeline option patterns.