SLING-8557 - Handle exception
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java b/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java
index 9a331f2..4435bb4 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java
@@ -32,6 +32,7 @@
import org.apache.kafka.common.header.Headers;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingException;
import org.apache.sling.distribution.journal.messages.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,7 +71,11 @@
private void handleRecord(HandlerAdapter<?> handler, ConsumerRecord<String, byte[]> record) {
MessageInfo info = new KafkaMessageInfo(record);
ByteString payload = ByteString.copyFrom(record.value());
- handler.handle(info, payload);
+ try {
+ handler.handle(info, payload);
+ } catch (Exception e) {
+ throw new MessagingException(e.getMessage(), e);
+ }
}
private String getHeaderValue(Headers headers, String key) {