Added ability for sources to publish messages on their own just like their function counterparts (#6941)

Co-authored-by: Sanjeev Kulkarni <sanjeevk@splunk.com>
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index 3bfd358..c4dde22 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -22,6 +22,9 @@
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.kafka.KafkaAbstractSource;
 import org.apache.pulsar.io.kafka.KafkaSourceConfig;
@@ -158,6 +161,11 @@
             public CompletableFuture<ByteBuffer> getStateAsync(String key) {
                 return null;
             }
+
+            @Override
+            public <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException {
+                return null;
+            }
         };
         Map<String, Object> config = new HashMap<>();
         ThrowingRunnable openAndClose = ()->{