[pulsar-kafka] Fix KafkaProducerInterceptorWrapper handles LongSerializer (#4549)

*Motivation*

KafkaProducerInterceptorWrapper uses a LongDeserializer for retrieve deserializer

*Modifications*

Fix the bug

*Verify this change*

Add unit test to cover the convertion
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
index 99195ce..5cedef9 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
@@ -34,6 +34,7 @@
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -232,10 +233,10 @@
                                     .getValue();
     }
 
-    private Deserializer getDeserializer(Serializer serializer) {
+    static Deserializer getDeserializer(Serializer serializer) {
         if (serializer instanceof StringSerializer) {
             return new StringDeserializer();
-        } else if (serializer instanceof LongDeserializer) {
+        } else if (serializer instanceof LongSerializer) {
             return new LongDeserializer();
         } else if (serializer instanceof IntegerSerializer) {
             return new IntegerDeserializer();
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
index 8da6339..aadfce8 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
@@ -19,7 +19,21 @@
 package org.apache.pulsar.client.kafka.compat;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.ByteBufferDeserializer;
+import org.apache.kafka.common.serialization.ByteBufferSerializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.DoubleDeserializer;
+import org.apache.kafka.common.serialization.DoubleSerializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.pulsar.client.api.ProducerInterceptor;
 import org.apache.pulsar.client.impl.ProducerInterceptors;
@@ -27,7 +41,7 @@
 import org.apache.pulsar.client.impl.schema.BytesSchema;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.testng.Assert;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import java.util.Arrays;
@@ -38,6 +52,7 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
 
 public class KafkaProducerInterceptorWrapperTest {
 
@@ -59,10 +74,10 @@
             @Override
             public Object answer(InvocationOnMock invocation) throws Throwable {
                 ProducerRecord<String, byte[]> record = (ProducerRecord<String, byte[]>) invocation.getArguments()[0];
-                Assert.assertEquals(record.key(), "original key");
-                Assert.assertEquals(record.value(), "original value".getBytes());
-                Assert.assertEquals(record.timestamp().longValue(), timeStamp);
-                Assert.assertEquals(record.partition().intValue(), partitionID);
+                assertEquals(record.key(), "original key");
+                assertEquals(record.value(), "original value".getBytes());
+                assertEquals(record.timestamp().longValue(), timeStamp);
+                assertEquals(record.partition().intValue(), partitionID);
                 return new ProducerRecord<String, byte[]>(topic, "processed key", "processed value".getBytes());
             }
         }).when(mockInterceptor1).onSend(any(ProducerRecord.class));
@@ -74,10 +89,10 @@
             @Override
             public Object answer(InvocationOnMock invocation) throws Throwable {
                 ProducerRecord<String, byte[]> record = (ProducerRecord<String, byte[]>) invocation.getArguments()[0];
-                Assert.assertEquals(record.key(), "processed key");
-                Assert.assertEquals(record.value(), "processed value".getBytes());
-                Assert.assertEquals(record.timestamp().longValue(), timeStamp);
-                Assert.assertEquals(record.partition().intValue(), partitionID);
+                assertEquals(record.key(), "processed key");
+                assertEquals(record.value(), "processed value".getBytes());
+                assertEquals(record.timestamp().longValue(), timeStamp);
+                assertEquals(record.partition().intValue(), partitionID);
                 return record;
             }
         }).when(mockInterceptor2).onSend(any(ProducerRecord.class));
@@ -99,4 +114,37 @@
         verify(mockInterceptor2, times(1)).onSend(any(ProducerRecord.class));
     }
 
+    @DataProvider(name = "serializers")
+    public Object[][] serializers() {
+        return new Object[][] {
+            {
+                new StringSerializer(), StringDeserializer.class
+            },
+            {
+                new LongSerializer(), LongDeserializer.class
+            },
+            {
+                new IntegerSerializer(), IntegerDeserializer.class,
+            },
+            {
+                new DoubleSerializer(), DoubleDeserializer.class,
+            },
+            {
+                new BytesSerializer(), BytesDeserializer.class
+            },
+            {
+                new ByteBufferSerializer(), ByteBufferDeserializer.class
+            },
+            {
+                new ByteArraySerializer(), ByteArrayDeserializer.class
+            }
+        };
+    }
+
+    @Test(dataProvider = "serializers")
+    public void testGetDeserializer(Serializer serializer, Class deserializerClass) {
+        Deserializer deserializer = KafkaProducerInterceptorWrapper.getDeserializer(serializer);
+        assertEquals(deserializer.getClass(), deserializerClass);
+    }
+
 }