SLING-9628 - Fix sonar issues
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java b/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java
index ab29b43..27b1ef4 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java
@@ -21,19 +21,14 @@
 import static java.util.Objects.requireNonNull;
 
 import java.io.IOException;
-import java.util.function.Consumer;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessageInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.databind.ObjectReader;
 
-public class JsonRecordHandler<T> implements Consumer<ConsumerRecord<String, String>> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(JsonRecordHandler.class);
+public class JsonRecordHandler<T> {
 
     public final MessageHandler<T> handler;
 
@@ -45,14 +40,10 @@
         
     }
 
-    public void accept(ConsumerRecord<String, String> record) {
+    public void accept(ConsumerRecord<String, String> record) throws IOException {
         MessageInfo info = new KafkaMessageInfo(record);
         String payload = record.value();
-        try {
-            T message = reader.readValue(payload);
-            handler.handle(info, message);
-        } catch (IOException e) {
-            LOG.warn("Failed to parse payload {}", payload);
-        }
+        T message = reader.readValue(payload);
+        handler.handle(info, message);
     }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
index ee8031f..4bc9f31 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
@@ -22,7 +22,7 @@
 import static java.util.Objects.requireNonNull;
 import static org.apache.sling.distribution.journal.kafka.KafkaClientProvider.PARTITION;
 
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -93,6 +93,6 @@
 
 
     private RecordHeader header(String key, String value) {
-        return new RecordHeader(key, value.getBytes(Charset.forName("utf-8")));
+        return new RecordHeader(key, value.getBytes(StandardCharsets.UTF_8));
     }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java
index e48a721..6e31871 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java
@@ -20,16 +20,15 @@
 
 import static java.time.Duration.ofHours;
 import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toMap;
 import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -52,7 +51,7 @@
 
     private final KafkaConsumer<String, String> consumer;
 
-    private final Map<String, Consumer<ConsumerRecord<String, String>>> handlers;
+    private final Map<String, JsonRecordHandler<?>> handlers;
     
     private final ExceptionEventSender eventSender;
     
@@ -62,20 +61,22 @@
 
     long errorSleepMs;
 
-
     public KafkaPoller(KafkaConsumer<String, String> consumer, ExceptionEventSender eventSender, List<HandlerAdapter<?>> adapters) {
         this.consumer = requireNonNull(consumer);
         this.eventSender = requireNonNull(eventSender);
         this.errorSleepMs = ERROR_SLEEP_MS;
         mapper = new ObjectMapper();
-        this.handlers = adapters.stream()
-            .collect(Collectors.toMap(adapter -> adapter.getType().getSimpleName(), this::toHandler));
+        this.handlers = adapters.stream().collect(toMap(this::typeName, this::toHandler));
         startBackgroundThread(this::run, "Message Poller");
     }
+
+    private String typeName(HandlerAdapter<?> adapter) {
+        return adapter.getType().getSimpleName();
+    }
     
-    <T> Consumer<ConsumerRecord<String, String>> toHandler(HandlerAdapter<T> adapter) {
+    <T> JsonRecordHandler<T> toHandler(HandlerAdapter<T> adapter) {
         ObjectReader reader = mapper.readerFor(adapter.getType());
-        return new JsonRecordHandler<T>(adapter.getHandler(), reader);
+        return new JsonRecordHandler<>(adapter.getHandler(), reader);
     }
     
     @Override
@@ -107,7 +108,7 @@
     public void handle(ConsumerRecord<String, String> record) {
         try {
             String messageType = getMessageType(record);
-            Consumer<ConsumerRecord<String, String>> handler = handlers.get(messageType);
+            JsonRecordHandler<?> handler = handlers.get(messageType);
             if (handler != null) {
                 handler.accept(record);
             } else {
@@ -124,7 +125,7 @@
             throw new MessagingException("Header " + KafkaMessageInfo.KEY_MESSAGE_TYPE + " missing.");
         }
         Header messageTypeHeader = headers.next();
-        return new String(messageTypeHeader.value(), Charset.forName("utf-8"));
+        return new String(messageTypeHeader.value(), StandardCharsets.UTF_8);
     }
 
     private void sleepAfterError() {
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java
index 75a385b..097919e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java
@@ -52,5 +52,6 @@
         assertThat(info.getTopic(), equalTo(TOPIC));
         assertThat(info.getCreateTime(), equalTo(TIMESTAMP));
         assertThat(info.toString(), equalTo("Topic: topic, Partition: 0, Offset: 1, CreateTime: 2"));
+        assertThat(info.getProps().size(), equalTo(0));
     }
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java
index 6061d8f..1d997fe 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java
@@ -23,7 +23,7 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -94,7 +94,7 @@
     }
 
     private RecordHeader header(String key, String value) {
-        return new RecordHeader(key, value.getBytes(Charset.forName("utf-8")));
+        return new RecordHeader(key, value.getBytes(StandardCharsets.UTF_8));
     }
     
     private ConsumerRecords<String, String> records(List<ConsumerRecord<String, String>> records) {