SLING-9628 - Fix sonar issues
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java b/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java
index ab29b43..27b1ef4 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java
@@ -21,19 +21,14 @@
import static java.util.Objects.requireNonNull;
import java.io.IOException;
-import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessageInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectReader;
-public class JsonRecordHandler<T> implements Consumer<ConsumerRecord<String, String>> {
-
- private static final Logger LOG = LoggerFactory.getLogger(JsonRecordHandler.class);
+public class JsonRecordHandler<T> {
public final MessageHandler<T> handler;
@@ -45,14 +40,10 @@
}
- public void accept(ConsumerRecord<String, String> record) {
+ public void accept(ConsumerRecord<String, String> record) throws IOException {
MessageInfo info = new KafkaMessageInfo(record);
String payload = record.value();
- try {
- T message = reader.readValue(payload);
- handler.handle(info, message);
- } catch (IOException e) {
- LOG.warn("Failed to parse payload {}", payload);
- }
+ T message = reader.readValue(payload);
+ handler.handle(info, message);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
index ee8031f..4bc9f31 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
@@ -22,7 +22,7 @@
import static java.util.Objects.requireNonNull;
import static org.apache.sling.distribution.journal.kafka.KafkaClientProvider.PARTITION;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -93,6 +93,6 @@
private RecordHeader header(String key, String value) {
- return new RecordHeader(key, value.getBytes(Charset.forName("utf-8")));
+ return new RecordHeader(key, value.getBytes(StandardCharsets.UTF_8));
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java
index e48a721..6e31871 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java
@@ -20,16 +20,15 @@
import static java.time.Duration.ofHours;
import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toMap;
import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
import java.io.Closeable;
import java.io.IOException;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -52,7 +51,7 @@
private final KafkaConsumer<String, String> consumer;
- private final Map<String, Consumer<ConsumerRecord<String, String>>> handlers;
+ private final Map<String, JsonRecordHandler<?>> handlers;
private final ExceptionEventSender eventSender;
@@ -62,20 +61,22 @@
long errorSleepMs;
-
public KafkaPoller(KafkaConsumer<String, String> consumer, ExceptionEventSender eventSender, List<HandlerAdapter<?>> adapters) {
this.consumer = requireNonNull(consumer);
this.eventSender = requireNonNull(eventSender);
this.errorSleepMs = ERROR_SLEEP_MS;
mapper = new ObjectMapper();
- this.handlers = adapters.stream()
- .collect(Collectors.toMap(adapter -> adapter.getType().getSimpleName(), this::toHandler));
+ this.handlers = adapters.stream().collect(toMap(this::typeName, this::toHandler));
startBackgroundThread(this::run, "Message Poller");
}
+
+ private String typeName(HandlerAdapter<?> adapter) {
+ return adapter.getType().getSimpleName();
+ }
- <T> Consumer<ConsumerRecord<String, String>> toHandler(HandlerAdapter<T> adapter) {
+ <T> JsonRecordHandler<T> toHandler(HandlerAdapter<T> adapter) {
ObjectReader reader = mapper.readerFor(adapter.getType());
- return new JsonRecordHandler<T>(adapter.getHandler(), reader);
+ return new JsonRecordHandler<>(adapter.getHandler(), reader);
}
@Override
@@ -107,7 +108,7 @@
public void handle(ConsumerRecord<String, String> record) {
try {
String messageType = getMessageType(record);
- Consumer<ConsumerRecord<String, String>> handler = handlers.get(messageType);
+ JsonRecordHandler<?> handler = handlers.get(messageType);
if (handler != null) {
handler.accept(record);
} else {
@@ -124,7 +125,7 @@
throw new MessagingException("Header " + KafkaMessageInfo.KEY_MESSAGE_TYPE + " missing.");
}
Header messageTypeHeader = headers.next();
- return new String(messageTypeHeader.value(), Charset.forName("utf-8"));
+ return new String(messageTypeHeader.value(), StandardCharsets.UTF_8);
}
private void sleepAfterError() {
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java
index 75a385b..097919e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java
@@ -52,5 +52,6 @@
assertThat(info.getTopic(), equalTo(TOPIC));
assertThat(info.getCreateTime(), equalTo(TIMESTAMP));
assertThat(info.toString(), equalTo("Topic: topic, Partition: 0, Offset: 1, CreateTime: 2"));
+ assertThat(info.getProps().size(), equalTo(0));
}
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java
index 6061d8f..1d997fe 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java
@@ -23,7 +23,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -94,7 +94,7 @@
}
private RecordHeader header(String key, String value) {
- return new RecordHeader(key, value.getBytes(Charset.forName("utf-8")));
+ return new RecordHeader(key, value.getBytes(StandardCharsets.UTF_8));
}
private ConsumerRecords<String, String> records(List<ConsumerRecord<String, String>> records) {