SLING-8557 - Fix error handling
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
index 3597d7a..e437ceb 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
@@ -69,29 +69,30 @@
 
     public void run() {
         LOG.info("Start JSON poller for handler {}", handler);
-        try {
-            while(running) {
-                consume();
-            }
-        } catch (WakeupException e) {
-            if (running) {
-                LOG.error("Waked up while running {}", e.getMessage(), e);
-                throw e;
-            } else {
+        while(running) {
+            try {
+                consumer.poll(ofHours(1))
+                    .forEach(this::handleRecord);
+            } catch (WakeupException e) {
                 LOG.debug("Waked up while stopping {}", e.getMessage(), e);
+                running = false;
+            } catch(Exception e) {
+                eventSender.send(e);
+                LOG.error("Exception during recieve: {}", e.getMessage(), e);
+                sleepAfterError();
+                // Continue as KafkaConsumer should handle the error transparently
             }
-        } catch(Throwable t) {
-            LOG.error(format("Catch Throwable %s closing consumer", t.getMessage()), t);
-            throw t;
-        } finally {
-            consumer.close();
         }
+        consumer.close();
         LOG.info("Stop JSON poller for handler {}", handler);
     }
-
-    private void consume() {
-        consumer.poll(ofHours(1))
-                .forEach(this::handleRecord);
+    
+    private void sleepAfterError() {
+        try {
+            Thread.sleep(10000);
+        } catch (InterruptedException e1) {
+            Thread.currentThread().interrupt();
+        }
     }
 
     private void handleRecord(ConsumerRecord<String, String> record) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
index 3e20d75..52ae1a6 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
@@ -77,27 +77,32 @@
 
     public void run() {
         LOG.info("Start poller for types {}", types);
-        try {
-            while(running) {
+        while(running) {
+            try {
                 consumer.poll(ofHours(1))
-                    .forEach(this::handleRecord);
+                .forEach(this::handleRecord);
+            } catch (WakeupException e) {
+                LOG.debug("Waked up {}", e.getMessage(), e);
+                this.running = false;
+            } catch(Exception e) {
+                eventSender.send(e);
+                LOG.error("Exception while receiving from kafka: {}", e.getMessage(), e);
+                sleepAfterError();
+                // Continue as KafkaConsumer should handle the error transparently
             }
-        } catch (WakeupException e) {
-            if (running) {
-                LOG.error("Waked up while running {}", e.getMessage(), e);
-                throw e;
-            } else {
-                LOG.debug("Waked up while stopping {}", e.getMessage(), e);
-            }
-        } catch(Throwable t) {
-            LOG.error(format("Catch Throwable %s closing consumer", t.getMessage()), t);
-            throw t;
-        } finally {
-            consumer.close();
         }
+        consumer.close();
         LOG.info("Stop poller for types {}", types);
     }
 
+    private void sleepAfterError() {
+        try {
+            Thread.sleep(10000);
+        } catch (InterruptedException e1) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
     private void handleRecord(ConsumerRecord<String, byte[]> record) {
         getHandler(record)
             .ifPresent(handler->handleRecord(handler, record));
@@ -125,7 +130,6 @@
             ByteString payload = ByteString.copyFrom(record.value());
             handler.handle(info, payload);
         } catch (Exception e) {
-            eventSender.send(e);
             String msg = format("Error consuming message for types %s", types);
             LOG.warn(msg);
         }