[FLINK-24281][connectors/kafka] Migrate all format tests from FlinkKafkaProducer to KafkaSink
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
index 70d5060..501d470 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
@@ -21,12 +21,15 @@
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.api.TableResult;
 
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -36,8 +39,6 @@
 import java.util.List;
 import java.util.Properties;
 
-import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE;
-import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic.EXACTLY_ONCE;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.readLines;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.waitingExpectedResults;
 
@@ -65,23 +66,8 @@
 
         // ---------- Write the Debezium json into Kafka -------------------
         List<String> lines = readLines("debezium-data-schema-exclude.txt");
-        DataStreamSource<String> stream = env.fromCollection(lines);
-        SerializationSchema<String> serSchema = new SimpleStringSchema();
-        FlinkKafkaPartitioner<String> partitioner = new FlinkFixedPartitioner<>();
-
-        // the producer must not produce duplicates
-        Properties producerProperties = getStandardProps();
-        producerProperties.setProperty("retries", "0");
         try {
-            stream.addSink(
-                    new FlinkKafkaProducer<>(
-                            topic,
-                            serSchema,
-                            producerProperties,
-                            partitioner,
-                            EXACTLY_ONCE,
-                            DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
-            env.execute("Write sequence");
+            writeRecordsToKafka(topic, lines);
         } catch (Exception e) {
             throw new Exception("Failed to write debezium data to Kafka.", e);
         }
@@ -208,23 +194,8 @@
 
         // ---------- Write the Canal json into Kafka -------------------
         List<String> lines = readLines("canal-data.txt");
-        DataStreamSource<String> stream = env.fromCollection(lines);
-        SerializationSchema<String> serSchema = new SimpleStringSchema();
-        FlinkKafkaPartitioner<String> partitioner = new FlinkFixedPartitioner<>();
-
-        // the producer must not produce duplicates
-        Properties producerProperties = getStandardProps();
-        producerProperties.setProperty("retries", "0");
         try {
-            stream.addSink(
-                    new FlinkKafkaProducer<>(
-                            topic,
-                            serSchema,
-                            producerProperties,
-                            partitioner,
-                            EXACTLY_ONCE,
-                            DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
-            env.execute("Write sequence");
+            writeRecordsToKafka(topic, lines);
         } catch (Exception e) {
             throw new Exception("Failed to write canal data to Kafka.", e);
         }
@@ -361,23 +332,8 @@
 
         // ---------- Write the Maxwell json into Kafka -------------------
         List<String> lines = readLines("maxwell-data.txt");
-        DataStreamSource<String> stream = env.fromCollection(lines);
-        SerializationSchema<String> serSchema = new SimpleStringSchema();
-        FlinkKafkaPartitioner<String> partitioner = new FlinkFixedPartitioner<>();
-
-        // the producer must not produce duplicates
-        Properties producerProperties = getStandardProps();
-        producerProperties.setProperty("retries", "0");
         try {
-            stream.addSink(
-                    new FlinkKafkaProducer<>(
-                            topic,
-                            serSchema,
-                            producerProperties,
-                            partitioner,
-                            EXACTLY_ONCE,
-                            DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
-            env.execute("Write sequence");
+            writeRecordsToKafka(topic, lines);
         } catch (Exception e) {
             throw new Exception("Failed to write maxwell data to Kafka.", e);
         }
@@ -492,4 +448,28 @@
         tableResult.getJobClient().get().cancel().get(); // stop the job
         deleteTestTopic(topic);
     }
+
+    private void writeRecordsToKafka(String topic, List<String> lines) throws Exception {
+        DataStreamSource<String> stream = env.fromCollection(lines);
+        SerializationSchema<String> serSchema = new SimpleStringSchema();
+        FlinkKafkaPartitioner<String> partitioner = new FlinkFixedPartitioner<>();
+
+        // the producer must not produce duplicates
+        Properties producerProperties = getStandardProps();
+        producerProperties.setProperty("retries", "0");
+        stream.sinkTo(
+                KafkaSink.<String>builder()
+                        .setBootstrapServers(
+                                producerProperties.getProperty(
+                                        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
+                        .setRecordSerializer(
+                                KafkaRecordSerializationSchema.builder()
+                                        .setTopic(topic)
+                                        .setValueSerializationSchema(serSchema)
+                                        .setPartitioner(partitioner)
+                                        .build())
+                        .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
+                        .build());
+        env.execute("Write sequence");
+    }
 }
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
index a562ec7..28c15a8 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
@@ -76,6 +76,9 @@
             }.withEmbeddedZookeeper()
                     .withNetwork(NETWORK)
                     .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS)
+                    .withEnv(
+                            "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
+                            String.valueOf(Duration.ofHours(2).toMillis()))
                     // Disable log deletion to prevent records from being deleted during test run
                     .withEnv("KAFKA_LOG_RETENTION_MS", "-1");
 
diff --git a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
index 840aa9e..e3f4149 100644
--- a/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
+++ b/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
@@ -20,16 +20,18 @@
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
 import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
 import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 
 import example.avro.User;
 import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.kafka.clients.producer.ProducerConfig;
 
 import java.util.Properties;
 
@@ -74,22 +76,36 @@
         SingleOutputStreamOperator<String> mapToString =
                 input.map((MapFunction<User, String>) SpecificRecordBase::toString);
 
-        FlinkKafkaProducer<String> stringFlinkKafkaProducer =
-                new FlinkKafkaProducer<>(
-                        parameterTool.getRequired("output-string-topic"),
-                        new SimpleStringSchema(),
-                        config);
-        mapToString.addSink(stringFlinkKafkaProducer);
+        KafkaSink<String> stringSink =
+                KafkaSink.<String>builder()
+                        .setBootstrapServers(
+                                config.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
+                        .setRecordSerializer(
+                                KafkaRecordSerializationSchema.builder()
+                                        .setValueSerializationSchema(new SimpleStringSchema())
+                                        .setTopic(parameterTool.getRequired("output-string-topic"))
+                                        .build())
+                        .setKafkaProducerConfig(config)
+                        .build();
+        mapToString.sinkTo(stringSink);
 
-        FlinkKafkaProducer<User> avroFlinkKafkaProducer =
-                new FlinkKafkaProducer<>(
-                        parameterTool.getRequired("output-avro-topic"),
-                        ConfluentRegistryAvroSerializationSchema.forSpecific(
-                                User.class,
-                                parameterTool.getRequired("output-subject"),
-                                schemaRegistryUrl),
-                        config);
-        input.addSink(avroFlinkKafkaProducer);
+        KafkaSink<User> avroSink =
+                KafkaSink.<User>builder()
+                        .setBootstrapServers(
+                                config.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
+                        .setRecordSerializer(
+                                KafkaRecordSerializationSchema.builder()
+                                        .setValueSerializationSchema(
+                                                ConfluentRegistryAvroSerializationSchema
+                                                        .forSpecific(
+                                                                User.class,
+                                                                parameterTool.getRequired(
+                                                                        "output-subject"),
+                                                                schemaRegistryUrl))
+                                        .setTopic(parameterTool.getRequired("output-avro-topic"))
+                                        .build())
+                        .build();
+        input.sinkTo(avroSink);
 
         env.execute("Kafka Confluent Schema Registry AVRO Example");
     }