SLING-8557 - Fix error handling
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
index 3597d7a..e437ceb 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
@@ -69,29 +69,30 @@
public void run() {
LOG.info("Start JSON poller for handler {}", handler);
- try {
- while(running) {
- consume();
- }
- } catch (WakeupException e) {
- if (running) {
- LOG.error("Waked up while running {}", e.getMessage(), e);
- throw e;
- } else {
+ while(running) {
+ try {
+ consumer.poll(ofHours(1))
+ .forEach(this::handleRecord);
+ } catch (WakeupException e) {
LOG.debug("Waked up while stopping {}", e.getMessage(), e);
+ running = false;
+ } catch(Exception e) {
+ eventSender.send(e);
+ LOG.error("Exception during recieve: {}", e.getMessage(), e);
+ sleepAfterError();
+ // Continue as KafkaConsumer should handle the error transparently
}
- } catch(Throwable t) {
- LOG.error(format("Catch Throwable %s closing consumer", t.getMessage()), t);
- throw t;
- } finally {
- consumer.close();
}
+ consumer.close();
LOG.info("Stop JSON poller for handler {}", handler);
}
-
- private void consume() {
- consumer.poll(ofHours(1))
- .forEach(this::handleRecord);
+
+ private void sleepAfterError() {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ }
}
private void handleRecord(ConsumerRecord<String, String> record) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
index 3e20d75..52ae1a6 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
@@ -77,27 +77,32 @@
public void run() {
LOG.info("Start poller for types {}", types);
- try {
- while(running) {
+ while(running) {
+ try {
consumer.poll(ofHours(1))
- .forEach(this::handleRecord);
+ .forEach(this::handleRecord);
+ } catch (WakeupException e) {
+ LOG.debug("Waked up {}", e.getMessage(), e);
+ this.running = false;
+ } catch(Exception e) {
+ eventSender.send(e);
+ LOG.error("Exception while receiving from kafka: {}", e.getMessage(), e);
+ sleepAfterError();
+ // Continue as KafkaConsumer should handle the error transparently
}
- } catch (WakeupException e) {
- if (running) {
- LOG.error("Waked up while running {}", e.getMessage(), e);
- throw e;
- } else {
- LOG.debug("Waked up while stopping {}", e.getMessage(), e);
- }
- } catch(Throwable t) {
- LOG.error(format("Catch Throwable %s closing consumer", t.getMessage()), t);
- throw t;
- } finally {
- consumer.close();
}
+ consumer.close();
LOG.info("Stop poller for types {}", types);
}
+ private void sleepAfterError() {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
private void handleRecord(ConsumerRecord<String, byte[]> record) {
getHandler(record)
.ifPresent(handler->handleRecord(handler, record));
@@ -125,7 +130,6 @@
ByteString payload = ByteString.copyFrom(record.value());
handler.handle(info, payload);
} catch (Exception e) {
- eventSender.send(e);
String msg = format("Error consuming message for types %s", types);
LOG.warn(msg);
}