[fix][io][branch-2.10] Not restart instance when kafka source poll exception. (#20816)
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index 3933375..2c4c3dd 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -26,7 +26,6 @@
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -43,7 +42,6 @@
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.functions.api.KVRecord;
import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +49,7 @@
/**
* Simple Kafka Source to transfer messages from a Kafka topic.
*/
-public abstract class KafkaAbstractSource<V> extends PushSource<V> {
+public abstract class KafkaAbstractSource<V> extends KafkaPushSource<V> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaAbstractSource.class);
@@ -126,7 +124,6 @@
throw new IllegalArgumentException("Unable to instantiate Kafka consumer", ex);
}
this.start();
- running = true;
}
protected Properties beforeCreateConsumer(Properties props) {
@@ -151,47 +148,36 @@
@SuppressWarnings("unchecked")
public void start() {
+ LOG.info("Starting subscribe kafka source on {}", kafkaSourceConfig.getTopic());
+ consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
runnerThread = new Thread(() -> {
- LOG.info("Starting kafka source on {}", kafkaSourceConfig.getTopic());
- consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
LOG.info("Kafka source started.");
while (running) {
- ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofSeconds(1L));
- CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
- int index = 0;
- for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {
- KafkaRecord record = buildRecord(consumerRecord);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Write record {} {} {}", record.getKey(), record.getValue(), record.getSchema());
+ try {
+ ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofSeconds(1L));
+ CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
+ int index = 0;
+ for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {
+ KafkaRecord record = buildRecord(consumerRecord);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Write record {} {} {}", record.getKey(), record.getValue(), record.getSchema());
+ }
+ consume(record);
+ futures[index] = record.getCompletableFuture();
+ index++;
}
- consume(record);
- futures[index] = record.getCompletableFuture();
- index++;
- }
- if (!kafkaSourceConfig.isAutoCommitEnabled()) {
- try {
+ if (!kafkaSourceConfig.isAutoCommitEnabled()) {
CompletableFuture.allOf(futures).get();
consumer.commitSync();
- } catch (InterruptedException ex) {
- break;
- } catch (ExecutionException ex) {
- LOG.error("Error while processing records", ex);
- break;
}
+ } catch (Exception e) {
+ LOG.error("Error while processing records", e);
+ notifyError(e);
+ break;
}
}
});
- runnerThread.setUncaughtExceptionHandler(
- (t, e) -> {
- new Thread(() -> {
- LOG.error("[{}] Error while consuming records", t.getName(), e);
- try {
- this.close();
- } catch (Exception ex) {
- LOG.error("[{}] Close kafka source error", t.getName(), e);
- }
- }, "Kafka Source Close Task Thread").start();
- });
+ running = true;
runnerThread.setName("Kafka Source Thread");
runnerThread.start();
}
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaPushSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaPushSource.java
new file mode 100644
index 0000000..d0d92da
--- /dev/null
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaPushSource.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Source;
+
+/**
+ * Kafka Push Source.
+ * To maintain compatibility, we can't pick the PIP-281: https://github.com/apache/pulsar/pull/20807
+ * cherry-pick to the historical version, so the class is implemented in the kafka connector.
+ */
+public abstract class KafkaPushSource<T> implements Source<T> {
+
+ private static class NullRecord implements Record {
+ @Override
+ public Object getValue() {
+ return null;
+ }
+ }
+
+ private static class ErrorNotifierRecord implements Record {
+ private Exception e;
+ public ErrorNotifierRecord(Exception e) {
+ this.e = e;
+ }
+ @Override
+ public Object getValue() {
+ return null;
+ }
+
+ public Exception getException() {
+ return e;
+ }
+ }
+
+ private LinkedBlockingQueue<Record<T>> queue;
+ private static final int DEFAULT_QUEUE_LENGTH = 1000;
+ private final NullRecord nullRecord = new NullRecord();
+
+ public KafkaPushSource() {
+ this.queue = new LinkedBlockingQueue<>(this.getQueueLength());
+ }
+
+ @Override
+ public Record<T> read() throws Exception {
+ Record<T> record = queue.take();
+ if (record instanceof ErrorNotifierRecord) {
+ throw ((ErrorNotifierRecord) record).getException();
+ }
+ if (record instanceof NullRecord) {
+ return null;
+ } else {
+ return record;
+ }
+ }
+
+ /**
+ * Send this message to be written to Pulsar.
+ * Pass null if you you are done with this task
+ * @param record next message from source which should be sent to a Pulsar topic
+ */
+ public void consume(Record<T> record) {
+ try {
+ if (record != null) {
+ queue.put(record);
+ } else {
+ queue.put(nullRecord);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Get length of the queue that records are push onto.
+ * Users can override this method to customize the queue length
+ * @return queue length
+ */
+ public int getQueueLength() {
+ return DEFAULT_QUEUE_LENGTH;
+ }
+
+ /**
+ * Allows the source to notify errors asynchronously.
+ * @param ex
+ */
+ public void notifyError(Exception ex) {
+ consume(new ErrorNotifierRecord(ex));
+ }
+}
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index ee573ca..54e7ac8 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableMap;
+import java.time.Duration;
import java.util.Collection;
import java.lang.reflect.Field;
import org.apache.kafka.clients.consumer.Consumer;
@@ -31,7 +32,6 @@
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.KafkaAbstractSource;
import org.apache.pulsar.io.kafka.KafkaSourceConfig;
-import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -158,26 +158,47 @@
assertEquals(config.getSslTruststorePassword(), "cert_pwd");
}
- @Test
- public final void closeConnectorWhenUnexpectedExceptionThrownTest() throws Exception {
+ @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "Subscribe exception")
+ public final void throwExceptionBySubscribe() throws Exception {
KafkaAbstractSource source = new DummySource();
+
+ KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig();
+ kafkaSourceConfig.setTopic("test-topic");
+ Field kafkaSourceConfigField = KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig");
+ kafkaSourceConfigField.setAccessible(true);
+ kafkaSourceConfigField.set(source, kafkaSourceConfig);
+
Consumer consumer = mock(Consumer.class);
- Mockito.doThrow(new RuntimeException("Uncaught exception")).when(consumer)
+ Mockito.doThrow(new RuntimeException("Subscribe exception")).when(consumer)
.subscribe(Mockito.any(Collection.class));
Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer");
consumerField.setAccessible(true);
consumerField.set(source, consumer);
-
+ // will throw RuntimeException.
source.start();
+ }
- Field runningField = KafkaAbstractSource.class.getDeclaredField("running");
- runningField.setAccessible(true);
+ @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "Pool exception")
+ public final void throwExceptionByPoll() throws Exception {
+ KafkaAbstractSource source = new DummySource();
- Awaitility.await().untilAsserted(() -> {
- Assert.assertFalse((boolean) runningField.get(source));
- Assert.assertNull(consumerField.get(source));
- });
+ KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig();
+ kafkaSourceConfig.setTopic("test-topic");
+ Field kafkaSourceConfigField = KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig");
+ kafkaSourceConfigField.setAccessible(true);
+ kafkaSourceConfigField.set(source, kafkaSourceConfig);
+
+ Consumer consumer = mock(Consumer.class);
+ Mockito.doThrow(new RuntimeException("Pool exception")).when(consumer)
+ .poll(Mockito.any(Duration.class));
+
+ Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer");
+ consumerField.setAccessible(true);
+ consumerField.set(source, consumer);
+ source.start();
+ // will throw RuntimeException.
+ source.read();
}
private File getFile(String name) {