[pulsar-kafka] Fix KafkaProducerInterceptorWrapper handles LongSerializer (#4549)
*Motivation*
KafkaProducerInterceptorWrapper uses a LongDeserializer for retrieve deserializer
*Modifications*
Fix the bug
*Verify this change*
Add unit test to cover the convertion
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
index 99195ce..5cedef9 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
@@ -34,6 +34,7 @@
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -232,10 +233,10 @@
.getValue();
}
- private Deserializer getDeserializer(Serializer serializer) {
+ static Deserializer getDeserializer(Serializer serializer) {
if (serializer instanceof StringSerializer) {
return new StringDeserializer();
- } else if (serializer instanceof LongDeserializer) {
+ } else if (serializer instanceof LongSerializer) {
return new LongDeserializer();
} else if (serializer instanceof IntegerSerializer) {
return new IntegerDeserializer();
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
index 8da6339..aadfce8 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
@@ -19,7 +19,21 @@
package org.apache.pulsar.client.kafka.compat;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.ByteBufferDeserializer;
+import org.apache.kafka.common.serialization.ByteBufferSerializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.DoubleDeserializer;
+import org.apache.kafka.common.serialization.DoubleSerializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.api.ProducerInterceptor;
import org.apache.pulsar.client.impl.ProducerInterceptors;
@@ -27,7 +41,7 @@
import org.apache.pulsar.client.impl.schema.BytesSchema;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.testng.Assert;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.Arrays;
@@ -38,6 +52,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
public class KafkaProducerInterceptorWrapperTest {
@@ -59,10 +74,10 @@
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
ProducerRecord<String, byte[]> record = (ProducerRecord<String, byte[]>) invocation.getArguments()[0];
- Assert.assertEquals(record.key(), "original key");
- Assert.assertEquals(record.value(), "original value".getBytes());
- Assert.assertEquals(record.timestamp().longValue(), timeStamp);
- Assert.assertEquals(record.partition().intValue(), partitionID);
+ assertEquals(record.key(), "original key");
+ assertEquals(record.value(), "original value".getBytes());
+ assertEquals(record.timestamp().longValue(), timeStamp);
+ assertEquals(record.partition().intValue(), partitionID);
return new ProducerRecord<String, byte[]>(topic, "processed key", "processed value".getBytes());
}
}).when(mockInterceptor1).onSend(any(ProducerRecord.class));
@@ -74,10 +89,10 @@
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
ProducerRecord<String, byte[]> record = (ProducerRecord<String, byte[]>) invocation.getArguments()[0];
- Assert.assertEquals(record.key(), "processed key");
- Assert.assertEquals(record.value(), "processed value".getBytes());
- Assert.assertEquals(record.timestamp().longValue(), timeStamp);
- Assert.assertEquals(record.partition().intValue(), partitionID);
+ assertEquals(record.key(), "processed key");
+ assertEquals(record.value(), "processed value".getBytes());
+ assertEquals(record.timestamp().longValue(), timeStamp);
+ assertEquals(record.partition().intValue(), partitionID);
return record;
}
}).when(mockInterceptor2).onSend(any(ProducerRecord.class));
@@ -99,4 +114,37 @@
verify(mockInterceptor2, times(1)).onSend(any(ProducerRecord.class));
}
+ @DataProvider(name = "serializers")
+ public Object[][] serializers() {
+ return new Object[][] {
+ {
+ new StringSerializer(), StringDeserializer.class
+ },
+ {
+ new LongSerializer(), LongDeserializer.class
+ },
+ {
+ new IntegerSerializer(), IntegerDeserializer.class,
+ },
+ {
+ new DoubleSerializer(), DoubleDeserializer.class,
+ },
+ {
+ new BytesSerializer(), BytesDeserializer.class
+ },
+ {
+ new ByteBufferSerializer(), ByteBufferDeserializer.class
+ },
+ {
+ new ByteArraySerializer(), ByteArrayDeserializer.class
+ }
+ };
+ }
+
+ @Test(dataProvider = "serializers")
+ public void testGetDeserializer(Serializer serializer, Class deserializerClass) {
+ Deserializer deserializer = KafkaProducerInterceptorWrapper.getDeserializer(serializer);
+ assertEquals(deserializer.getClass(), deserializerClass);
+ }
+
}