Fix:PulsarKafkaProducer is not thread safe (#4745)
fix #4707
(cherry picked from commit 0362944f6f7d610dd5b54d3724958e0bb9cb7abc)
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 8491a15..aba5d1d 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -253,7 +253,9 @@
private org.apache.pulsar.client.api.Producer<byte[]> createNewProducer(String topic) {
try {
// Add the partitions info for the new topic
- cluster = cluster.withPartitions(readPartitionsInfo(topic));
+ synchronized (this){
+ cluster = cluster.withPartitions(readPartitionsInfo(topic));
+ }
List<org.apache.pulsar.client.api.ProducerInterceptor> wrappedInterceptors = interceptors.stream()
.map(interceptor -> new KafkaProducerInterceptorWrapper(interceptor, keySchema, valueSchema, topic))
.collect(Collectors.toList());
diff --git a/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/PulsarKafkaProducerThreadSafeTest.java b/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/PulsarKafkaProducerThreadSafeTest.java
new file mode 100644
index 0000000..1246b1d
--- /dev/null
+++ b/tests/pulsar-kafka-compat-client-test/src/test/java/org/apache/pulsar/tests/integration/compat/kafka/PulsarKafkaProducerThreadSafeTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.compat.kafka;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.PulsarKafkaProducer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+import java.util.Properties;
+/**
+ * A test that tests if {@link PulsarKafkaProducer} is thread safe.
+ */
+public class PulsarKafkaProducerThreadSafeTest extends PulsarStandaloneTestSuite {
+ private Producer producer;
+
+ private static String getPlainTextServiceUrl() {
+ return container.getPlainTextServiceUrl();
+ }
+
+ @BeforeTest
+ private void setup() {
+ Properties producerProperties = new Properties();
+ producerProperties.put("bootstrap.servers", getPlainTextServiceUrl());
+ producerProperties.put("key.serializer", IntegerSerializer.class.getName());
+ producerProperties.put("value.serializer", StringSerializer.class.getName());
+ producer = new KafkaProducer<>(producerProperties);
+ }
+
+ /**
+ * This test run 10 times in threadPool witch size is 5.
+ * Different threads have same producer and different topics witch is based on thread time.
+ * This test will be failed when producer failed to send if PulsarKafkaProducer is not thread safe.
+ */
+ @Test(threadPoolSize = 5, invocationCount = 10)
+ public void testPulsarKafkaProducerThreadSafe() {
+ String topic1 = "persistent://public/default/topic-" + System.currentTimeMillis();
+ ProducerRecord<String, String> record = new ProducerRecord<>(topic1, "Hello");
+ producer.send(record);
+ }
+}