[FLINK-27174][connector/kafka] Fix checking of bootstrapServers when already provided in producer Properties
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
index e87d4dd..4783050 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
@@ -180,26 +180,33 @@
         return this;
     }
 
+    private void sanityCheck() {
+        if (kafkaProducerConfig == null) {
+            setKafkaProducerConfig(new Properties());
+        }
+        if (bootstrapServers != null) {
+            kafkaProducerConfig.setProperty(
+                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        }
+        checkNotNull(
+                kafkaProducerConfig.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
+                "bootstrapServers");
+        if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
+            checkState(
+                    transactionalIdPrefix != null,
+                    "EXACTLY_ONCE delivery guarantee requires a transactionIdPrefix to be set to provide unique transaction names across multiple KafkaSinks writing to the same Kafka cluster.");
+        }
+        checkNotNull(recordSerializer, "recordSerializer");
+    }
+
     /**
      * Constructs the {@link KafkaSink} with the configured properties.
      *
      * @return {@link KafkaSink}
      */
     public KafkaSink<IN> build() {
-        if (kafkaProducerConfig == null) {
-            setKafkaProducerConfig(new Properties());
-        }
-        checkNotNull(bootstrapServers);
-        if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
-            checkState(
-                    transactionalIdPrefix != null,
-                    "EXACTLY_ONCE delivery guarantee requires a transactionIdPrefix to be set to provide unique transaction names across multiple KafkaSinks writing to the same Kafka cluster.");
-        }
-        kafkaProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        sanityCheck();
         return new KafkaSink<>(
-                deliveryGuarantee,
-                kafkaProducerConfig,
-                transactionalIdPrefix,
-                checkNotNull(recordSerializer, "recordSerializer"));
+                deliveryGuarantee, kafkaProducerConfig, transactionalIdPrefix, recordSerializer);
     }
 }
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java
new file mode 100644
index 0000000..1ad2cde
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+/** Tests for {@link KafkaSinkBuilder}. */
+@ExtendWith(TestLoggerExtension.class)
+public class KafkaSinkBuilderTest {
+
+    @Test
+    public void testBootstrapServerSettingWithProperties() {
+        Properties testConf = new Properties();
+        testConf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "testServer");
+        KafkaSinkBuilder<String> builder =
+                new KafkaSinkBuilder<String>()
+                        .setKafkaProducerConfig(testConf)
+                        .setRecordSerializer(
+                                KafkaRecordSerializationSchema.builder()
+                                        .setTopic("topic")
+                                        .setValueSerializationSchema(new SimpleStringSchema())
+                                        .build());
+
+        assertDoesNotThrow(builder::build);
+    }
+}