SLING-8400 - Simplify creation of KafkaMessageInfo (#2)

diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
index 0eec98f..7810368 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
@@ -90,11 +90,7 @@
     }
 
     private void handleRecord(ConsumerRecord<String, String> record) {
-        MessageInfo info = new KafkaMessageInfo(
-                record.topic(),
-                record.partition(),
-                record.offset(),
-                record.timestamp());
+        MessageInfo info = new KafkaMessageInfo(record);
         String payload = record.value();
         try {
             T message = reader.readValue(payload);
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
index 4eebfd0..9d49562 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
@@ -27,11 +27,11 @@
     private final long offset;
     private final long createTime;
 
-    public KafkaMessageInfo(String topic, int partition, long offset, long createTime) {
-        this.topic = topic;
-        this.partition = partition;
-        this.offset = offset;
-        this.createTime = createTime;
+    public KafkaMessageInfo(ConsumerRecord<String, ?> record) {
+        this.topic = record.topic();
+        this.partition = record.partition();
+        this.offset = record.offset();
+        this.createTime = record.timestamp();
     }
 
     public String getTopic() {
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
index 6b22362..687d253 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
@@ -114,11 +114,7 @@
     }
 
     private void handleRecord(HandlerAdapter<?> adapter, ConsumerRecord<String, byte[]> record) throws Exception {
-        MessageInfo info = new KafkaMessageInfo(
-                record.topic(),
-                record.partition(),
-                record.offset(),
-                record.timestamp());
+        MessageInfo info = new KafkaMessageInfo(record);
         ByteString payload = ByteString.copyFrom(record.value());
         adapter.handle(info, payload);
     }