SLING-8557 - Handle exception
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java b/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java
index 9a331f2..4435bb4 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java
@@ -32,6 +32,7 @@
 import org.apache.kafka.common.header.Headers;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingException;
 import org.apache.sling.distribution.journal.messages.Types;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,7 +71,11 @@
     private void handleRecord(HandlerAdapter<?> handler, ConsumerRecord<String, byte[]> record) {
         MessageInfo info = new KafkaMessageInfo(record);
         ByteString payload = ByteString.copyFrom(record.value());
-        handler.handle(info, payload);
+        try {
+            handler.handle(info, payload);
+        } catch (Exception e) {
+            throw new MessagingException(e.getMessage(), e);
+        }
     }
 
     private String getHeaderValue(Headers headers, String key) {