| Prompt: |
| What does this code do? |
| |
| ```java |
| package kafka; |
| |
| import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; |
| import org.apache.beam.sdk.Pipeline; |
| import org.apache.beam.sdk.io.kafka.KafkaIO; |
| import org.apache.beam.sdk.io.kafka.KafkaRecord; |
| import org.apache.beam.sdk.options.Description; |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.ParDo; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.kafka.common.serialization.IntegerDeserializer; |
| import org.apache.kafka.common.serialization.StringDeserializer; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class ReadKafkaTopic { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(ReadKafkaTopic.class); |
| |
| public interface ReadKafkaTopicOptions extends DataflowPipelineOptions { |
| @Description("Kafka Bootstrap Server") |
| String getBootstrapServer(); |
| |
| void setBootstrapServer(String value); |
| |
| @Description("Kafka Topic Name") |
| String getTopicName(); |
| |
| void setTopicName(String value); |
| } |
| |
| public static void main(String[] args) { |
| ReadKafkaTopicOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class); |
| |
| Pipeline p = Pipeline.create(options); |
| |
| p |
| .apply(KafkaIO.<Integer, String>read() |
| .withBootstrapServers(options.getBootstrapServer()) |
| .withTopic(options.getTopicName()) |
| .withKeyDeserializer(IntegerDeserializer.class) |
| .withValueDeserializer(StringDeserializer.class) |
| ) |
| .apply("Log message", ParDo.of(new DoFn<KafkaRecord<Integer, String>, String>() { |
| @ProcessElement |
| public void processElement(ProcessContext c) { |
| KV<Integer, String> kv = c.element().getKV(); |
| String message = String.format("Message Key: %s, Message Value: %s", |
| kv.getKey(), kv.getValue()); |
| LOG.info(message); |
| c.output(message); |
| } |
| }) |
| ); |
| |
| p.run(); |
| } |
| } |
| ``` |
| |
| Response: |
| This code snippet uses the Apache Beam KafkaIO connector to read messages from a Kafka topic. |
| |
| The `ReadKafkaTopicOptions` interface specifies the Kafka bootstrap server and topic name. The `@Description` annotation provides a description of each option. When running the pipeline, use the command-line arguments `--bootstrapServer` and `--topicName` to set these parameters. |
| |
| ```java |
| public interface ReadKafkaTopicOptions extends DataflowPipelineOptions { |
| @Description("Kafka Bootstrap Server") |
| String getBootstrapServer(); |
| |
| void setBootstrapServer(String value); |
| |
| @Description("Kafka Topic Name") |
| String getTopicName(); |
| |
| void setTopicName(String value); |
| } |
| ``` |
| In the `main` method, the `PipelineOptionsFactory` class creates a `ReadKafkaTopicOptions` object from the command-line arguments. The `Pipeline.create` method creates a new pipeline with the specified options. |
| |
| ```java |
| ReadKafkaTopicOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class); |
| |
| Pipeline p = Pipeline.create(options); |
| ``` |
| |
| The pipeline reads messages from the specified Kafka topic using `KafkaIO.read()`. This method configures the Kafka source by providing the bootstrap server, topic name, key deserializer, and value deserializer. Then, the pipeline applies a `ParDo` transform to log the Kafka message key and value. |
| |
| ```java |
| .apply(KafkaIO.<Integer, String>read() |
| .withBootstrapServers(options.getBootstrapServer()) |
| .withTopic(options.getTopicName()) |
| .withKeyDeserializer(IntegerDeserializer.class) |
| .withValueDeserializer(StringDeserializer.class) |
| ) |
| .apply("Log message", ParDo.of(new DoFn<KafkaRecord<Integer, String>, String>() { |
| @ProcessElement |
| public void processElement(ProcessContext c) { |
| KV<Integer, String> kv = c.element().getKV(); |
| String message = String.format("Message Key: %s, Message Value: %s", |
| kv.getKey(), kv.getValue()); |
| LOG.info(message); |
| c.output(message); |
| } |
| }) |
| ); |
| ``` |
| |
| Finally, the `run` method executes the pipeline. |
| |
| ```java |
| p.run(); |
| ``` |