expose new message with different schema (#5517)

Master Issue: #5141 

Expose new message with different schema interface, which not required same parameterized type with the producer.
Since the producer and messages sent by it may have different inner types, it's better to have a type agnostic producer interceptor with a checkin method.
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
index 0dc8c91..21a4f8d 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -142,7 +142,8 @@
         doReturn(mockPartitionFuture).when(mockClient).getPartitionsForTopic(anyString());
         doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString());
         doReturn(mockProducerBuilder).when(mockProducerBuilder).clone();
-        doReturn(mockProducerBuilder).when(mockProducerBuilder).intercept(any());
+        doReturn(mockProducerBuilder).when(mockProducerBuilder).intercept(
+                (org.apache.pulsar.client.api.ProducerInterceptor) any());
         doReturn(mockProducer).when(mockProducerBuilder).create();
         doReturn(mockTypedMessageBuilder).when(mockProducer).newMessage();
         doReturn(mockSendAsyncFuture).when(mockTypedMessageBuilder).sendAsync();
@@ -168,7 +169,8 @@
         pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,"key", "value"));
 
         // Verify
-        verify(mockProducerBuilder, times(1)).intercept(any());
+        verify(mockProducerBuilder, times(1)).intercept(
+                (org.apache.pulsar.client.api.ProducerInterceptor)any());
     }
 
     @Test
@@ -190,7 +192,8 @@
         doReturn(mockPartitionFuture).when(mockClient).getPartitionsForTopic(anyString());
         doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString());
         doReturn(mockProducerBuilder).when(mockProducerBuilder).clone();
-        doReturn(mockProducerBuilder).when(mockProducerBuilder).intercept(any());
+        doReturn(mockProducerBuilder).when(mockProducerBuilder).intercept(
+                (org.apache.pulsar.client.api.ProducerInterceptor) any());
         doReturn(mockProducer).when(mockProducerBuilder).create();
         doReturn(mockTypedMessageBuilder).when(mockProducer).newMessage();
         doReturn(mockSendAsyncFuture).when(mockTypedMessageBuilder).sendAsync();
@@ -227,7 +230,8 @@
 
         // Verify
         verify(mockTypedMessageBuilder, times(1)).sendAsync();
-        verify(mockProducerBuilder, times(1)).intercept(any());
+        verify(mockProducerBuilder, times(1)).intercept(
+                (org.apache.pulsar.client.api.ProducerInterceptor) any());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid value 2147483648000 for 'connections.max.idle.ms'. Please use a value smaller than 2147483647000 milliseconds.")
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
index f96c9c7..eea92c4 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.kafka.compat;
 
+import java.util.stream.Collectors;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -37,6 +38,7 @@
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.pulsar.client.api.ProducerInterceptor;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.interceptor.ProducerInterceptorWrapper;
 import org.apache.pulsar.client.impl.ProducerInterceptors;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.impl.schema.BytesSchema;
@@ -101,9 +103,11 @@
 
         Schema<String> pulsarKeySerializeSchema = new PulsarKafkaSchema<>(new StringSerializer());
         Schema<byte[]> pulsarValueSerializeSchema = new PulsarKafkaSchema<>(new ByteArraySerializer());
-        ProducerInterceptors producerInterceptors = new ProducerInterceptors(Arrays.asList(new ProducerInterceptor[]{
-                new KafkaProducerInterceptorWrapper(mockInterceptor1, pulsarKeySerializeSchema, pulsarValueSerializeSchema, topic),
-                new KafkaProducerInterceptorWrapper(mockInterceptor2, pulsarKeySerializeSchema, pulsarValueSerializeSchema, topic)}));
+        ProducerInterceptors producerInterceptors = new ProducerInterceptors(
+                Arrays.stream(new ProducerInterceptor[]{
+                        new KafkaProducerInterceptorWrapper(mockInterceptor1, pulsarKeySerializeSchema, pulsarValueSerializeSchema, topic),
+                        new KafkaProducerInterceptorWrapper(mockInterceptor2, pulsarKeySerializeSchema, pulsarValueSerializeSchema, topic)}).map(
+                        ProducerInterceptorWrapper::new).collect(Collectors.toList()));
 
         TypedMessageBuilderImpl typedMessageBuilder = new TypedMessageBuilderImpl(null, new BytesSchema());
         typedMessageBuilder.key("original key");