[fix][io][branch-2.10] Not restart instance when kafka source poll exception. (#20816)

diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index 3933375..2c4c3dd 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -26,7 +26,6 @@
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -43,7 +42,6 @@
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.functions.api.KVRecord;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,7 +49,7 @@
 /**
  * Simple Kafka Source to transfer messages from a Kafka topic.
  */
-public abstract class KafkaAbstractSource<V> extends PushSource<V> {
+public abstract class KafkaAbstractSource<V> extends KafkaPushSource<V> {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaAbstractSource.class);
 
@@ -126,7 +124,6 @@
             throw new IllegalArgumentException("Unable to instantiate Kafka consumer", ex);
         }
         this.start();
-        running = true;
     }
 
     protected Properties beforeCreateConsumer(Properties props) {
@@ -151,47 +148,36 @@
 
     @SuppressWarnings("unchecked")
     public void start() {
+        LOG.info("Starting subscribe kafka source on {}", kafkaSourceConfig.getTopic());
+        consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
         runnerThread = new Thread(() -> {
-            LOG.info("Starting kafka source on {}", kafkaSourceConfig.getTopic());
-            consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
             LOG.info("Kafka source started.");
             while (running) {
-                ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofSeconds(1L));
-                CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
-                int index = 0;
-                for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {
-                    KafkaRecord record = buildRecord(consumerRecord);
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Write record {} {} {}", record.getKey(), record.getValue(), record.getSchema());
+                try {
+                    ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofSeconds(1L));
+                    CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
+                    int index = 0;
+                    for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {
+                        KafkaRecord record = buildRecord(consumerRecord);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Write record {} {} {}", record.getKey(), record.getValue(), record.getSchema());
+                        }
+                        consume(record);
+                        futures[index] = record.getCompletableFuture();
+                        index++;
                     }
-                    consume(record);
-                    futures[index] = record.getCompletableFuture();
-                    index++;
-                }
-                if (!kafkaSourceConfig.isAutoCommitEnabled()) {
-                    try {
+                    if (!kafkaSourceConfig.isAutoCommitEnabled()) {
                         CompletableFuture.allOf(futures).get();
                         consumer.commitSync();
-                    } catch (InterruptedException ex) {
-                        break;
-                    } catch (ExecutionException ex) {
-                        LOG.error("Error while processing records", ex);
-                        break;
                     }
+                } catch (Exception e) {
+                    LOG.error("Error while processing records", e);
+                    notifyError(e);
+                    break;
                 }
             }
         });
-        runnerThread.setUncaughtExceptionHandler(
-                (t, e) -> {
-                    new Thread(() -> {
-                        LOG.error("[{}] Error while consuming records", t.getName(), e);
-                        try {
-                            this.close();
-                        } catch (Exception ex) {
-                            LOG.error("[{}] Close kafka source error", t.getName(), e);
-                        }
-                    }, "Kafka Source Close Task Thread").start();
-                });
+        running = true;
         runnerThread.setName("Kafka Source Thread");
         runnerThread.start();
     }
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaPushSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaPushSource.java
new file mode 100644
index 0000000..d0d92da
--- /dev/null
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaPushSource.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Source;
+
+/**
+ * Kafka Push Source.
+ * To maintain compatibility, we can't pick the PIP-281: https://github.com/apache/pulsar/pull/20807
+ * cherry-pick to the historical version, so the class is implemented in the kafka connector.
+ */
+public abstract class KafkaPushSource<T> implements Source<T> {
+
+    private static class NullRecord implements Record {
+        @Override
+        public Object getValue() {
+            return null;
+        }
+    }
+
+    private static class ErrorNotifierRecord implements Record {
+        private Exception e;
+        public ErrorNotifierRecord(Exception e) {
+            this.e = e;
+        }
+        @Override
+        public Object getValue() {
+            return null;
+        }
+
+        public Exception getException() {
+            return e;
+        }
+    }
+
+    private LinkedBlockingQueue<Record<T>> queue;
+    private static final int DEFAULT_QUEUE_LENGTH = 1000;
+    private final NullRecord nullRecord = new NullRecord();
+
+    public KafkaPushSource() {
+        this.queue = new LinkedBlockingQueue<>(this.getQueueLength());
+    }
+
+    @Override
+    public Record<T> read() throws Exception {
+        Record<T> record = queue.take();
+        if (record instanceof ErrorNotifierRecord) {
+            throw ((ErrorNotifierRecord) record).getException();
+        }
+        if (record instanceof NullRecord) {
+            return null;
+        } else {
+            return record;
+        }
+    }
+
+    /**
+     * Send this message to be written to Pulsar.
+     * Pass null if you you are done with this task
+     * @param record next message from source which should be sent to a Pulsar topic
+     */
+    public void consume(Record<T> record) {
+        try {
+            if (record != null) {
+                queue.put(record);
+            } else {
+                queue.put(nullRecord);
+            }
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Get length of the queue that records are push onto.
+     * Users can override this method to customize the queue length
+     * @return queue length
+     */
+    public int getQueueLength() {
+        return DEFAULT_QUEUE_LENGTH;
+    }
+
+    /**
+     * Allows the source to notify errors asynchronously.
+     * @param ex
+     */
+    public void notifyError(Exception ex) {
+        consume(new ErrorNotifierRecord(ex));
+    }
+}
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index ee573ca..54e7ac8 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -21,6 +21,7 @@
 
 
 import com.google.common.collect.ImmutableMap;
+import java.time.Duration;
 import java.util.Collection;
 import java.lang.reflect.Field;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -31,7 +32,6 @@
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.kafka.KafkaAbstractSource;
 import org.apache.pulsar.io.kafka.KafkaSourceConfig;
-import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -158,26 +158,47 @@
         assertEquals(config.getSslTruststorePassword(), "cert_pwd");
     }
 
-    @Test
-    public final void closeConnectorWhenUnexpectedExceptionThrownTest() throws Exception {
+    @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "Subscribe exception")
+    public final void throwExceptionBySubscribe() throws Exception {
         KafkaAbstractSource source = new DummySource();
+
+        KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig();
+        kafkaSourceConfig.setTopic("test-topic");
+        Field kafkaSourceConfigField = KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig");
+        kafkaSourceConfigField.setAccessible(true);
+        kafkaSourceConfigField.set(source, kafkaSourceConfig);
+
         Consumer consumer = mock(Consumer.class);
-        Mockito.doThrow(new RuntimeException("Uncaught exception")).when(consumer)
+        Mockito.doThrow(new RuntimeException("Subscribe exception")).when(consumer)
                 .subscribe(Mockito.any(Collection.class));
 
         Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer");
         consumerField.setAccessible(true);
         consumerField.set(source, consumer);
-
+        // will throw RuntimeException.
         source.start();
+    }
 
-        Field runningField = KafkaAbstractSource.class.getDeclaredField("running");
-        runningField.setAccessible(true);
+    @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "Pool exception")
+    public final void throwExceptionByPoll() throws Exception {
+        KafkaAbstractSource source = new DummySource();
 
-        Awaitility.await().untilAsserted(() -> {
-            Assert.assertFalse((boolean) runningField.get(source));
-            Assert.assertNull(consumerField.get(source));
-        });
+        KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig();
+        kafkaSourceConfig.setTopic("test-topic");
+        Field kafkaSourceConfigField = KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig");
+        kafkaSourceConfigField.setAccessible(true);
+        kafkaSourceConfigField.set(source, kafkaSourceConfig);
+
+        Consumer consumer = mock(Consumer.class);
+        Mockito.doThrow(new RuntimeException("Pool exception")).when(consumer)
+                .poll(Mockito.any(Duration.class));
+
+        Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer");
+        consumerField.setAccessible(true);
+        consumerField.set(source, consumer);
+        source.start();
+        // will throw RuntimeException.
+        source.read();
     }
 
     private File getFile(String name) {