blob: d8b58a7b5c39f727ade3025d112e6bb6dcf863e8 [file] [log] [blame] [view]
Prompt:
What does this code do?
```python
class KafkaOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--bootstrap_servers',
default="localhost:9092",
help='Apache Kafka bootstrap servers'
)
parser.add_argument(
'--topic',
default="your-topic",
help='Apache Kafka topic'
)
options = KafkaOptions()
with beam.Pipeline(options=options) as p:
output = (p | "Read from Kafka" >> ReadFromKafka(
consumer_config={'bootstrap.servers': options.bootstrap_servers},
topics=[options.topic],
with_metadata=False
)
| "Log Data" >> Map(logging.info))
```
Response:
This code uses the Apache Beam [KafkaIO](https://beam.apache.org/releases/pydoc/current/apache_beam.io.kafka.html) Python connector to read data from an [Apache Kafka](https://kafka.apache.org/) topic `topic` on Kafka bootstrap servers `bootstrap_servers`.
The `ReadFromKafka` transform is a cross-language transform that is implemented in Java and is available in Python, Go, and Typescript via X-language. The Python SDK connects to a Java expansion service to expand these transforms during pipeline execution. For more information about cross-language transforms, see Apache Beam [Multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines).
```python
with beam.Pipeline(options=options) as p:
output = (p | "Read from Kafka" >> ReadFromKafka(
consumer_config={'bootstrap.servers': options.bootstrap_servers},
topics=[options.topic],
with_metadata=False
)
| "Log Data" >> Map(logging.info))
```
The required arguments for the `ReadFromKafka` transform are `consumer_config` and `topics`. You can use the `consumer_config` argument to specify the Kafka bootstrap servers, the `topics` argument to specify the Kafka topic to read data from, and the `with_metadata` argument to specify whether to include Kafka metadata in the output `PCollection`. The default value for the `with_metadata` argument is `False`.
The `KafkaReadOptions` class defines the command-line arguments `bootstrap_servers` and `topic`, which are used to configure the `ReadFromKafka` transform. These arguments are parsed from the command line using [pipeline options](https://beam.apache.org/documentation/patterns/pipeline-options/).
```python
class KafkaOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--bootstrap_servers',
default="localhost:9092",
help='Apache Kafka bootstrap servers'
)
parser.add_argument(
'--topic',
default="your-topic",
help='Apache Kafka topic'
)
options = KafkaOptions()
```
For more information about how to use the KafkaIO connector with the Python SDK for Apache Beam, see [KafkaIO module](https://beam.apache.org/releases/pydoc/current/apache_beam.io.kafka.html).