SLING-8557 - Use a common class for pollers, improve test coverage
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java b/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java
new file mode 100644
index 0000000..14fd391
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+
+public class JsonRecordHandler<T> implements Consumer<ConsumerRecord<String, String>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JsonRecordHandler.class);
+
+    private final MessageHandler<T> handler;
+
+    private final ObjectReader reader;
+
+    public JsonRecordHandler(MessageHandler<T> handler, Class<T> clazz) {
+        this.handler = requireNonNull(handler);
+        ObjectMapper mapper = new ObjectMapper();
+        reader = mapper.readerFor(requireNonNull(clazz));
+    }
+
+    @Override
+    public void accept(ConsumerRecord<String, String> record) {
+        MessageInfo info = new KafkaMessageInfo(record);
+        String payload = record.value();
+        try {
+            T message = reader.readValue(payload);
+            handler.handle(info, message);
+        } catch (IOException e) {
+            LOG.warn("Failed to parse payload {}", payload);
+        }
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
index 9cc9703..199e7fe 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
@@ -146,7 +146,7 @@
         } else {
             consumer.seekToEnd(topicPartitions);
         }
-        Closeable poller = new KafkaMessagePoller(consumer, eventSender, adapters);
+        Closeable poller = KafkaPoller.createProtobufPoller(consumer, eventSender, adapters);
         LOG.info("Created poller for reset {}, topicName {}, assign {}", reset, topicName, assign);
         return poller;
     }
@@ -167,7 +167,7 @@
         } else {
             consumer.seekToEnd(topicPartitions);
         }
-        return new KafkaJsonMessagePoller<>(consumer, eventSender, handler, type);
+        return KafkaPoller.createJsonPoller(consumer, eventSender, handler, type);
     }
 
     @Override
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
deleted file mode 100644
index e437ceb..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.kafka;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.sling.distribution.journal.ExceptionEventSender;
-import org.apache.sling.distribution.journal.MessageHandler;
-import org.apache.sling.distribution.journal.MessageInfo;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectReader;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.errors.WakeupException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
-import static java.lang.String.format;
-import static java.time.Duration.ofHours;
-import static java.util.Objects.requireNonNull;
-
-public class KafkaJsonMessagePoller<T> implements Closeable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaJsonMessagePoller.class);
-
-    private volatile boolean running = true;
-
-    private final KafkaConsumer<String, String> consumer;
-
-    private final MessageHandler<T> handler;
-
-    private final ObjectReader reader;
-
-    private final ExceptionEventSender eventSender;
-
-    public KafkaJsonMessagePoller(KafkaConsumer<String, String> consumer, ExceptionEventSender eventSender, MessageHandler<T> handler, Class<T> clazz) {
-        this.consumer = requireNonNull(consumer);
-        this.eventSender = requireNonNull(eventSender);
-        this.handler = requireNonNull(handler);
-        ObjectMapper mapper = new ObjectMapper();
-        reader = mapper.readerFor(requireNonNull(clazz));
-        startBackgroundThread(this::run, format("Message Json Poller for handler %s", handler));
-    }
-
-    @Override
-    public void close() throws IOException {
-        LOG.info("Shutdown JSON poller for handler {}", handler);
-        running = false;
-        consumer.wakeup();
-    }
-
-    public void run() {
-        LOG.info("Start JSON poller for handler {}", handler);
-        while(running) {
-            try {
-                consumer.poll(ofHours(1))
-                    .forEach(this::handleRecord);
-            } catch (WakeupException e) {
-                LOG.debug("Waked up while stopping {}", e.getMessage(), e);
-                running = false;
-            } catch(Exception e) {
-                eventSender.send(e);
-                LOG.error("Exception during recieve: {}", e.getMessage(), e);
-                sleepAfterError();
-                // Continue as KafkaConsumer should handle the error transparently
-            }
-        }
-        consumer.close();
-        LOG.info("Stop JSON poller for handler {}", handler);
-    }
-    
-    private void sleepAfterError() {
-        try {
-            Thread.sleep(10000);
-        } catch (InterruptedException e1) {
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    private void handleRecord(ConsumerRecord<String, String> record) {
-        MessageInfo info = new KafkaMessageInfo(record);
-        String payload = record.value();
-        try {
-            T message = reader.readValue(payload);
-            handler.handle(info, message);
-        } catch (IOException e) {
-            eventSender.send(e);
-            LOG.error("Failed to parse payload {}", payload);
-        }
-    }
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
deleted file mode 100644
index 52ae1a6..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.kafka;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-import org.apache.sling.distribution.journal.messages.Types;
-import org.apache.sling.distribution.journal.ExceptionEventSender;
-import org.apache.sling.distribution.journal.HandlerAdapter;
-import org.apache.sling.distribution.journal.MessageInfo;
-import com.google.protobuf.ByteString;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.header.Headers;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
-import static java.lang.Integer.parseInt;
-import static java.lang.String.format;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.time.Duration.ofHours;
-import static java.util.Objects.requireNonNull;
-
-public class KafkaMessagePoller implements Closeable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaMessagePoller.class);
-
-    private final Map<Class<?>, HandlerAdapter<?>> handlers = new HashMap<>();
-
-    private final KafkaConsumer<String, byte[]> consumer;
-
-    private volatile boolean running = true;
-
-    private final String types;
-
-    private final ExceptionEventSender eventSender;
-
-    public KafkaMessagePoller(KafkaConsumer<String, byte[]> consumer, ExceptionEventSender eventSender, HandlerAdapter<?>... handlerAdapters) {
-        this.consumer = requireNonNull(consumer);
-        this.eventSender = requireNonNull(eventSender);
-        for (HandlerAdapter<?> handlerAdapter : handlerAdapters) {
-            handlers.put(handlerAdapter.getType(), handlerAdapter);
-        }
-        types = handlers.keySet().toString();
-        startBackgroundThread(this::run, format("Message Poller %s", types));
-    }
-
-    @Override
-    public void close() throws IOException {
-        LOG.info("Shutdown poller for types {}", types);
-        running = false;
-        consumer.wakeup();
-    }
-
-    public void run() {
-        LOG.info("Start poller for types {}", types);
-        while(running) {
-            try {
-                consumer.poll(ofHours(1))
-                .forEach(this::handleRecord);
-            } catch (WakeupException e) {
-                LOG.debug("Waked up {}", e.getMessage(), e);
-                this.running = false;
-            } catch(Exception e) {
-                eventSender.send(e);
-                LOG.error("Exception while receiving from kafka: {}", e.getMessage(), e);
-                sleepAfterError();
-                // Continue as KafkaConsumer should handle the error transparently
-            }
-        }
-        consumer.close();
-        LOG.info("Stop poller for types {}", types);
-    }
-
-    private void sleepAfterError() {
-        try {
-            Thread.sleep(10000);
-        } catch (InterruptedException e1) {
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    private void handleRecord(ConsumerRecord<String, byte[]> record) {
-        getHandler(record)
-            .ifPresent(handler->handleRecord(handler, record));
-    }
-
-    private Optional<HandlerAdapter<?>> getHandler(ConsumerRecord<String, byte[]> record) {
-        try {
-            int type = parseInt(getHeaderValue(record.headers(), "type"));
-            int version = parseInt(getHeaderValue(record.headers(), "version"));
-            Class<?> messageClass = Types.getType(type, version);
-            Optional<HandlerAdapter<?>> handler = Optional.ofNullable(handlers.get(messageClass));
-            if (!handler.isPresent()) {
-                LOG.debug("No handler registered for type {}", messageClass.getName());
-            }
-            return handler;
-        } catch (RuntimeException e) {
-            LOG.info("No handler found for headers {}.", record.headers(), e);
-            return Optional.empty();
-        }
-    }
-
-    private void handleRecord(HandlerAdapter<?> handler, ConsumerRecord<String, byte[]> record) {
-        try {
-            MessageInfo info = new KafkaMessageInfo(record);
-            ByteString payload = ByteString.copyFrom(record.value());
-            handler.handle(info, payload);
-        } catch (Exception e) {
-            String msg = format("Error consuming message for types %s", types);
-            LOG.warn(msg);
-        }
-    }
-
-    private String getHeaderValue(Headers headers, String key) {
-        Header header = Optional.ofNullable(headers.lastHeader(key))
-            .orElseThrow(()->new IllegalArgumentException(format("Header with key %s not found", key)));
-        return new String(header.value(), UTF_8);
-    }
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java
new file mode 100644
index 0000000..671fa28
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+import static java.time.Duration.ofHours;
+import static java.util.Objects.requireNonNull;
+import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaPoller<T> implements Closeable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaPoller.class);
+
+    private static final long ERROR_SLEEP_MS = 10000;
+
+    private final KafkaConsumer<String, T> consumer;
+
+    private final Consumer<ConsumerRecord<String, T>> handler;
+    
+    private final ExceptionEventSender eventSender;
+    
+    private volatile boolean running = true;
+
+    long errorSleepMs;
+
+    public KafkaPoller(KafkaConsumer<String, T> consumer, ExceptionEventSender eventSender, Consumer<ConsumerRecord<String, T>> handler) {
+        this.handler = handler;
+        this.consumer = requireNonNull(consumer);
+        this.eventSender = requireNonNull(eventSender);
+        this.errorSleepMs = ERROR_SLEEP_MS;
+        startBackgroundThread(this::run, "Message Poller");
+    }
+    
+    public static Closeable createProtobufPoller(KafkaConsumer<String, byte[]> consumer, ExceptionEventSender eventSender, HandlerAdapter<?>... adapters) {
+        Consumer<ConsumerRecord<String, byte[]>> handler = new ProtobufRecordHandler(adapters);
+        return new KafkaPoller<byte[]>(consumer, eventSender, handler);
+    }
+    
+    public static <T>  Closeable createJsonPoller(KafkaConsumer<String, String> consumer, ExceptionEventSender eventSender, MessageHandler<T> handler, Class<T> clazz) {
+        Consumer<ConsumerRecord<String, String>> recordHandler = new JsonRecordHandler<T>(handler, clazz);
+        return new KafkaPoller<String>(consumer, eventSender, recordHandler);
+    }
+
+    @Override
+    public void close() throws IOException {
+        LOG.info("Shutdown poller");
+        running = false;
+        consumer.wakeup();
+    }
+
+    public void run() {
+        LOG.info("Start poller");
+        while(running) {
+            try {
+                consumer.poll(ofHours(1)).forEach(this::handle);
+            } catch (WakeupException e) {
+                LOG.debug("Waked up {}", e.getMessage(), e);
+                this.running = false;
+            } catch (Exception e) {
+                eventSender.send(e);
+                LOG.error("Exception while receiving from kafka: {}", e.getMessage(), e);
+                sleepAfterError();
+                // Continue as KafkaConsumer should handle the error transparently
+            }
+        }
+        consumer.close();
+        LOG.info("Stopped poller");
+    }
+    
+    public void handle(ConsumerRecord<String, T> record) {
+        try {
+            handler.accept(record);
+        } catch (Exception e) {
+            LOG.warn("Error consuming message {}", record.headers());
+        }
+    }
+
+    private void sleepAfterError() {
+        try {
+            Thread.sleep(errorSleepMs);
+        } catch (InterruptedException e1) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java b/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java
new file mode 100644
index 0000000..9a331f2
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+import static java.lang.Integer.parseInt;
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.messages.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+public class ProtobufRecordHandler implements Consumer<ConsumerRecord<String, byte[]>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ProtobufRecordHandler.class);
+
+    private final Map<Class<?>, HandlerAdapter<?>> handlers = new HashMap<>();
+
+    public ProtobufRecordHandler(HandlerAdapter<?>... handlerAdapters) {
+        for (HandlerAdapter<?> handlerAdapter : handlerAdapters) {
+            handlers.put(handlerAdapter.getType(), handlerAdapter);
+        }
+    }
+
+    @Override
+    public void accept(ConsumerRecord<String, byte[]> record) {
+        getHandler(record)
+            .ifPresent(handler->handleRecord(handler, record));
+    }
+
+    private Optional<HandlerAdapter<?>> getHandler(ConsumerRecord<String, byte[]> record) {
+        int type = parseInt(getHeaderValue(record.headers(), "type"));
+        int version = parseInt(getHeaderValue(record.headers(), "version"));
+        Class<?> messageClass = Types.getType(type, version);
+        Optional<HandlerAdapter<?>> handler = Optional.ofNullable(handlers.get(messageClass));
+        if (!handler.isPresent()) {
+            LOG.debug("No handler registered for type {}", messageClass.getName());
+        }
+        return handler;
+    }
+
+    private void handleRecord(HandlerAdapter<?> handler, ConsumerRecord<String, byte[]> record) {
+        MessageInfo info = new KafkaMessageInfo(record);
+        ByteString payload = ByteString.copyFrom(record.value());
+        handler.handle(info, payload);
+    }
+
+    private String getHeaderValue(Headers headers, String key) {
+        Header header = Optional.ofNullable(headers.lastHeader(key))
+            .orElseThrow(()->new IllegalArgumentException(format("Header with key %s not found", key)));
+        return new String(header.value(), UTF_8);
+    }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSenderTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSenderTest.java
new file mode 100644
index 0000000..27926bf
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSenderTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
+import org.apache.sling.distribution.journal.MessagingException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaJsonMessageSenderTest {
+
+    private static final String TOPIC = "topic";
+
+    @Mock
+    private ExceptionEventSender eventSender;
+    
+    @Mock
+    private KafkaProducer<String, byte[]> producer;
+    
+    @InjectMocks
+    private KafkaJsonMessageSender<Person> sender;
+
+    @Mock
+    private Future<RecordMetadata> record;
+
+    @Test(expected = MessagingException.class)
+    public void testSendError() throws Exception {
+        when(producer.send(Mockito.any())).thenReturn(record);
+        when(record.get()).thenThrow(new ExecutionException(new IOException("Expected")));
+        Person person = new Person();
+        person.name = "name";
+        sender.send(TOPIC, person);
+    }
+    
+    public static class Person {
+        public String name;
+    }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePollerTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePollerTest.java
deleted file mode 100644
index 8947696..0000000
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePollerTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.kafka;
-
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.TimestampType;
-import org.apache.sling.distribution.journal.ExceptionEventSender;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.runners.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class KafkaMessagePollerTest {
-    @Mock
-    ExceptionEventSender eventSender;
-    
-    @Mock
-    KafkaConsumer<String, byte[]> consumer;
-
-    private Semaphore sem = new Semaphore(0);
-
-    @Test
-    public void testNoHeader() throws IOException, InterruptedException {
-        ConsumerRecord<String, byte[]> record = new ConsumerRecord<String, byte[]>("topic", 1, 0l, 0l, TimestampType.CREATE_TIME, 0, 0, 0, "key", null);
-        when(consumer.poll(Mockito.any()))
-            .thenReturn(records(Collections.singletonList(record)))
-            .thenReturn(records(Collections.emptyList()));
-        // Should display java.lang.IllegalArgumentException in log
-        try (KafkaMessagePoller poller = new KafkaMessagePoller(consumer, eventSender, create(DiscoveryMessage.class, this::handle))) {
-            Assert.assertThat(sem.tryAcquire(100, TimeUnit.MILLISECONDS), equalTo(false));
-        }
-    }
-
-    private void handle(MessageInfo info, DiscoveryMessage message) {
-        sem.release();
-    }
-
-    private ConsumerRecords<String, byte[]> records(List<ConsumerRecord<String, byte[]>> records) {
-        Map<TopicPartition, List<ConsumerRecord<String, byte[]>>> rm = new HashMap<>();
-        for (ConsumerRecord<String, byte[]> record : records) {
-            rm.put(new TopicPartition(record.topic(), record.partition()), Arrays.asList(record));
-        }
-        return new ConsumerRecords<>(rm);
-    }
-}
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java
new file mode 100644
index 0000000..93778d2
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaPollerTest {
+
+    @Mock
+    private ExceptionEventSender eventSender;
+    
+    @Mock
+    private KafkaConsumer<String, String> consumer;
+
+    @Mock
+    private Consumer<ConsumerRecord<String, String>> handler;
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testHandleError() throws Exception {
+        ConsumerRecord<String, String> record = new ConsumerRecord<String, String>("topic", 1, 0l, "", "");
+        when(consumer.poll(Mockito.any()))
+            .thenReturn(records(Arrays.asList(record)))
+            .thenThrow(new KafkaException("Expected"))
+            .thenThrow(new WakeupException());
+        doThrow(new RuntimeException("Expected")).when(handler).accept(Mockito.any(ConsumerRecord.class));
+        KafkaPoller<String> poller = new KafkaPoller<String>(consumer, eventSender, handler);
+        poller.errorSleepMs = 100;
+        // Should see "Error consuming message" in the log
+        verify(handler, timeout(1000)).accept(Mockito.any(ConsumerRecord.class));
+        verify(eventSender, timeout(1000)).send(Mockito.any(KafkaException.class));
+        verify(consumer, timeout(1000)).close();
+        poller.close();
+    }
+    
+    private ConsumerRecords<String, String> records(List<ConsumerRecord<String, String>> records) {
+        Map<TopicPartition, List<ConsumerRecord<String, String>>> rm = new HashMap<>();
+        for (ConsumerRecord<String, String> record : records) {
+            rm.put(new TopicPartition(record.topic(), record.partition()), Arrays.asList(record));
+        }
+        return new ConsumerRecords<>(rm);
+    }
+    
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandlerTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandlerTest.java
new file mode 100644
index 0000000..3c76ea7
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandlerTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+
+import java.io.IOException;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ProtobufRecordHandlerTest {
+    @Mock
+    ExceptionEventSender eventSender;
+    
+    @Mock
+    KafkaConsumer<String, byte[]> consumer;
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNoHeader() throws IOException, InterruptedException {
+        ConsumerRecord<String, byte[]> record = new ConsumerRecord<String, byte[]>("topic", 1, 0l, 0l, TimestampType.CREATE_TIME, 0, 0, 0, "key", null);
+        ProtobufRecordHandler handler = new ProtobufRecordHandler(create(DiscoveryMessage.class, this::handle));
+        handler.accept(record);
+    }
+
+    private void handle(MessageInfo info, DiscoveryMessage message) {
+    }
+}