[FLINK-24292][connectors/kafka] Use KafkaSink in examples instead of FlinkKafkaProducer
diff --git a/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java b/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
index 36988b7..01e4819 100644
--- a/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
+++ b/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
@@ -18,17 +18,19 @@
package org.apache.flink.streaming.kafka.test;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.kafka.test.base.CustomWatermarkExtractor;
import org.apache.flink.streaming.kafka.test.base.KafkaEvent;
import org.apache.flink.streaming.kafka.test.base.KafkaEventSchema;
import org.apache.flink.streaming.kafka.test.base.KafkaExampleUtil;
import org.apache.flink.streaming.kafka.test.base.RollingAdditionMapper;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
/**
* A simple example that shows how to read from and write to modern Kafka. This will read String
* messages from the input topic, parse them into a POJO type {@link KafkaEvent}, group by some key,
@@ -60,12 +62,18 @@
.keyBy("word")
.map(new RollingAdditionMapper());
- input.addSink(
- new FlinkKafkaProducer<>(
- parameterTool.getRequired("output-topic"),
- new KeyedSerializationSchemaWrapper<>(new KafkaEventSchema()),
- parameterTool.getProperties(),
- FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
+ input.sinkTo(
+ KafkaSink.<KafkaEvent>builder()
+ .setBootstrapServers(
+ parameterTool
+ .getProperties()
+ .getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
+ .setRecordSerializer(
+ KafkaRecordSerializationSchema.builder()
+ .setTopic(parameterTool.getRequired("output-topic"))
+ .setValueSerializationSchema(new KafkaEventSchema())
+ .build())
+ .build());
env.execute("Modern Kafka Example");
}
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java
index 0d3d476..822d17c 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java
@@ -19,8 +19,10 @@
package org.apache.flink.streaming.examples.statemachine;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.examples.statemachine.event.Event;
import org.apache.flink.streaming.examples.statemachine.generator.EventsGeneratorSource;
import org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializer;
@@ -47,7 +49,16 @@
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new EventsGeneratorSource(errorRate, sleep))
- .addSink(new FlinkKafkaProducer<>(brokers, kafkaTopic, new EventDeSerializer()));
+ .sinkTo(
+ KafkaSink.<Event>builder()
+ .setBootstrapServers(brokers)
+ .setRecordSerializer(
+ KafkaRecordSerializationSchema.builder()
+ .setValueSerializationSchema(
+ new EventDeSerializer())
+ .setTopic(kafkaTopic)
+ .build())
+ .build());
// trigger program execution
env.execute("State machine example Kafka events generator job");