[FLINK-24292][connectors/kafka] Use KafkaSink in examples instead of FlinkKafkaProducer
diff --git a/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java b/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
index 36988b7..01e4819 100644
--- a/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
+++ b/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
@@ -18,17 +18,19 @@
 package org.apache.flink.streaming.kafka.test;
 
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.kafka.test.base.CustomWatermarkExtractor;
 import org.apache.flink.streaming.kafka.test.base.KafkaEvent;
 import org.apache.flink.streaming.kafka.test.base.KafkaEventSchema;
 import org.apache.flink.streaming.kafka.test.base.KafkaExampleUtil;
 import org.apache.flink.streaming.kafka.test.base.RollingAdditionMapper;
 
+import org.apache.kafka.clients.producer.ProducerConfig;
+
 /**
  * A simple example that shows how to read from and write to modern Kafka. This will read String
  * messages from the input topic, parse them into a POJO type {@link KafkaEvent}, group by some key,
@@ -60,12 +62,18 @@
                         .keyBy("word")
                         .map(new RollingAdditionMapper());
 
-        input.addSink(
-                new FlinkKafkaProducer<>(
-                        parameterTool.getRequired("output-topic"),
-                        new KeyedSerializationSchemaWrapper<>(new KafkaEventSchema()),
-                        parameterTool.getProperties(),
-                        FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
+        input.sinkTo(
+                KafkaSink.<KafkaEvent>builder()
+                        .setBootstrapServers(
+                                parameterTool
+                                        .getProperties()
+                                        .getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
+                        .setRecordSerializer(
+                                KafkaRecordSerializationSchema.builder()
+                                        .setTopic(parameterTool.getRequired("output-topic"))
+                                        .setValueSerializationSchema(new KafkaEventSchema())
+                                        .build())
+                        .build());
 
         env.execute("Modern Kafka Example");
     }
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java
index 0d3d476..822d17c 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java
@@ -19,8 +19,10 @@
 package org.apache.flink.streaming.examples.statemachine;
 
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.examples.statemachine.event.Event;
 import org.apache.flink.streaming.examples.statemachine.generator.EventsGeneratorSource;
 import org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializer;
 
@@ -47,7 +49,16 @@
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
         env.addSource(new EventsGeneratorSource(errorRate, sleep))
-                .addSink(new FlinkKafkaProducer<>(brokers, kafkaTopic, new EventDeSerializer()));
+                .sinkTo(
+                        KafkaSink.<Event>builder()
+                                .setBootstrapServers(brokers)
+                                .setRecordSerializer(
+                                        KafkaRecordSerializationSchema.builder()
+                                                .setValueSerializationSchema(
+                                                        new EventDeSerializer())
+                                                .setTopic(kafkaTopic)
+                                                .build())
+                                .build());
 
         // trigger program execution
         env.execute("State machine example Kafka events generator job");