blob: c2fe034f1bb1a842e19f777410fa70a409d250f0 [file] [log] [blame] [view]
Prompt:
What does this code do?
```java
package kafka;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ReadKafkaTopic {
private static final Logger LOG = LoggerFactory.getLogger(ReadKafkaTopic.class);
public interface ReadKafkaTopicOptions extends DataflowPipelineOptions {
@Description("Kafka Bootstrap Server")
String getBootstrapServer();
void setBootstrapServer(String value);
@Description("Kafka Topic Name")
String getTopicName();
void setTopicName(String value);
}
public static void main(String[] args) {
ReadKafkaTopicOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class);
Pipeline p = Pipeline.create(options);
p
.apply(KafkaIO.<Integer, String>read()
.withBootstrapServers(options.getBootstrapServer())
.withTopic(options.getTopicName())
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
)
.apply("Log message", ParDo.of(new DoFn<KafkaRecord<Integer, String>, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<Integer, String> kv = c.element().getKV();
String message = String.format("Message Key: %s, Message Value: %s",
kv.getKey(), kv.getValue());
LOG.info(message);
c.output(message);
}
})
);
p.run();
}
}
```
Response:
This code snippet uses the Apache Beam KafkaIO connector to read messages from a Kafka topic.
The `ReadKafkaTopicOptions` interface specifies the Kafka bootstrap server and topic name. The `@Description` annotation provides a description of each option. When running the pipeline, use the command-line arguments `--bootstrapServer` and `--topicName` to set these parameters.
```java
public interface ReadKafkaTopicOptions extends DataflowPipelineOptions {
@Description("Kafka Bootstrap Server")
String getBootstrapServer();
void setBootstrapServer(String value);
@Description("Kafka Topic Name")
String getTopicName();
void setTopicName(String value);
}
```
In the `main` method, the `PipelineOptionsFactory` class creates a `ReadKafkaTopicOptions` object from the command-line arguments. The `Pipeline.create` method creates a new pipeline with the specified options.
```java
ReadKafkaTopicOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class);
Pipeline p = Pipeline.create(options);
```
The pipeline reads messages from the specified Kafka topic using `KafkaIO.read()`. This method configures the Kafka source by providing the bootstrap server, topic name, key deserializer, and value deserializer. Then, the pipeline applies a `ParDo` transform to log the Kafka message key and value.
```java
.apply(KafkaIO.<Integer, String>read()
.withBootstrapServers(options.getBootstrapServer())
.withTopic(options.getTopicName())
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
)
.apply("Log message", ParDo.of(new DoFn<KafkaRecord<Integer, String>, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<Integer, String> kv = c.element().getKV();
String message = String.format("Message Key: %s, Message Value: %s",
kv.getKey(), kv.getValue());
LOG.info(message);
c.output(message);
}
})
);
```
Finally, the `run` method executes the pipeline.
```java
p.run();
```