SLING-8554 - Refactor to avoid null
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
index 8046901..3e20d75 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.sling.distribution.journal.messages.Types;
 import org.apache.sling.distribution.journal.ExceptionEventSender;
@@ -78,7 +79,8 @@
         LOG.info("Start poller for types {}", types);
         try {
             while(running) {
-                consume();
+                consumer.poll(ofHours(1))
+                    .forEach(this::handleRecord);
             }
         } catch (WakeupException e) {
             if (running) {
@@ -96,47 +98,42 @@
         LOG.info("Stop poller for types {}", types);
     }
 
-    private void consume() {
-        consumer.poll(ofHours(1))
-                .forEach(this::handleRecord);
-    }
-
     private void handleRecord(ConsumerRecord<String, byte[]> record) {
-        Class<?> type;
-        HandlerAdapter<?> adapter;
+        getHandler(record)
+            .ifPresent(handler->handleRecord(handler, record));
+    }
+
+    private Optional<HandlerAdapter<?>> getHandler(ConsumerRecord<String, byte[]> record) {
         try {
-            type = Types.getType(
-                    parseInt(getHeaderValue(record.headers(), "type")),
-                    parseInt(getHeaderValue(record.headers(), "version")));
-            adapter = handlers.get(type);
-        } catch (RuntimeException e) {
-            LOG.info("Ignoring unknown message");
-            return;
-        }
-        if (adapter != null) {
-            try {
-                handleRecord(adapter, record);
-            } catch (Exception e) {
-                eventSender.send(e);
-                String msg = format("Error consuming message for types %s", types);
-                LOG.warn(msg);
+            int type = parseInt(getHeaderValue(record.headers(), "type"));
+            int version = parseInt(getHeaderValue(record.headers(), "version"));
+            Class<?> messageClass = Types.getType(type, version);
+            Optional<HandlerAdapter<?>> handler = Optional.ofNullable(handlers.get(messageClass));
+            if (!handler.isPresent()) {
+                LOG.debug("No handler registered for type {}", messageClass.getName());
             }
-        } else {
-            LOG.debug("No handler registered for type {}", type.getName());
+            return handler;
+        } catch (RuntimeException e) {
+            LOG.info("No handler found for headers {}.", record.headers(), e);
+            return Optional.empty();
         }
     }
 
-    private void handleRecord(HandlerAdapter<?> adapter, ConsumerRecord<String, byte[]> record) throws Exception {
-        MessageInfo info = new KafkaMessageInfo(record);
-        ByteString payload = ByteString.copyFrom(record.value());
-        adapter.handle(info, payload);
+    private void handleRecord(HandlerAdapter<?> handler, ConsumerRecord<String, byte[]> record) {
+        try {
+            MessageInfo info = new KafkaMessageInfo(record);
+            ByteString payload = ByteString.copyFrom(record.value());
+            handler.handle(info, payload);
+        } catch (Exception e) {
+            eventSender.send(e);
+            String msg = format("Error consuming message for types %s", types);
+            LOG.warn(msg);
+        }
     }
 
     private String getHeaderValue(Headers headers, String key) {
-        Header header = headers.lastHeader(key);
-        if (header == null) {
-            throw new IllegalArgumentException(format("Header with key %s not found", key));
-        }
+        Header header = Optional.ofNullable(headers.lastHeader(key))
+            .orElseThrow(()->new IllegalArgumentException(format("Header with key %s not found", key)));
         return new String(header.value(), UTF_8);
     }
 }