SLING-8554 - Refactor to avoid null
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
index 8046901..3e20d75 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import org.apache.sling.distribution.journal.messages.Types;
import org.apache.sling.distribution.journal.ExceptionEventSender;
@@ -78,7 +79,8 @@
LOG.info("Start poller for types {}", types);
try {
while(running) {
- consume();
+ consumer.poll(ofHours(1))
+ .forEach(this::handleRecord);
}
} catch (WakeupException e) {
if (running) {
@@ -96,47 +98,42 @@
LOG.info("Stop poller for types {}", types);
}
- private void consume() {
- consumer.poll(ofHours(1))
- .forEach(this::handleRecord);
- }
-
private void handleRecord(ConsumerRecord<String, byte[]> record) {
- Class<?> type;
- HandlerAdapter<?> adapter;
+ getHandler(record)
+ .ifPresent(handler->handleRecord(handler, record));
+ }
+
+ private Optional<HandlerAdapter<?>> getHandler(ConsumerRecord<String, byte[]> record) {
try {
- type = Types.getType(
- parseInt(getHeaderValue(record.headers(), "type")),
- parseInt(getHeaderValue(record.headers(), "version")));
- adapter = handlers.get(type);
- } catch (RuntimeException e) {
- LOG.info("Ignoring unknown message");
- return;
- }
- if (adapter != null) {
- try {
- handleRecord(adapter, record);
- } catch (Exception e) {
- eventSender.send(e);
- String msg = format("Error consuming message for types %s", types);
- LOG.warn(msg);
+ int type = parseInt(getHeaderValue(record.headers(), "type"));
+ int version = parseInt(getHeaderValue(record.headers(), "version"));
+ Class<?> messageClass = Types.getType(type, version);
+ Optional<HandlerAdapter<?>> handler = Optional.ofNullable(handlers.get(messageClass));
+ if (!handler.isPresent()) {
+ LOG.debug("No handler registered for type {}", messageClass.getName());
}
- } else {
- LOG.debug("No handler registered for type {}", type.getName());
+ return handler;
+ } catch (RuntimeException e) {
+ LOG.info("No handler found for headers {}.", record.headers(), e);
+ return Optional.empty();
}
}
- private void handleRecord(HandlerAdapter<?> adapter, ConsumerRecord<String, byte[]> record) throws Exception {
- MessageInfo info = new KafkaMessageInfo(record);
- ByteString payload = ByteString.copyFrom(record.value());
- adapter.handle(info, payload);
+ private void handleRecord(HandlerAdapter<?> handler, ConsumerRecord<String, byte[]> record) {
+ try {
+ MessageInfo info = new KafkaMessageInfo(record);
+ ByteString payload = ByteString.copyFrom(record.value());
+ handler.handle(info, payload);
+ } catch (Exception e) {
+ eventSender.send(e);
+ String msg = format("Error consuming message for types %s", types);
+ LOG.warn(msg);
+ }
}
private String getHeaderValue(Headers headers, String key) {
- Header header = headers.lastHeader(key);
- if (header == null) {
- throw new IllegalArgumentException(format("Header with key %s not found", key));
- }
+ Header header = Optional.ofNullable(headers.lastHeader(key))
+ .orElseThrow(()->new IllegalArgumentException(format("Header with key %s not found", key)));
return new String(header.value(), UTF_8);
}
}