Add support for LogEvent timestamp embedding in ProducerRecord
diff --git a/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaAppender.java b/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaAppender.java
index b9b5f5c..5f8e320 100644
--- a/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaAppender.java
+++ b/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaAppender.java
@@ -59,6 +59,9 @@
         @PluginAttribute(value = "syncSend", defaultBoolean = true)
         private boolean syncSend;
 
+        @PluginAttribute(value = "timestamp", defaultBoolean = false)
+        private boolean sendTimestamp;
+
         @SuppressWarnings("resource")
         @Override
         public KafkaAppender build() {
@@ -68,7 +71,7 @@
                 return null;
             }
             final KafkaManager kafkaManager = KafkaManager.getManager(getConfiguration().getLoggerContext(),
-                    getName(), topic, syncSend, getPropertyArray(), key);
+                    getName(), topic, syncSend, sendTimestamp, getPropertyArray(), key);
             return new KafkaAppender(getName(), layout, getFilter(), isIgnoreExceptions(), getPropertyArray(), kafkaManager);
         }
 
@@ -80,6 +83,8 @@
             return syncSend;
         }
 
+        public boolean isSendTimestamp() { return sendTimestamp; }
+
         public B setTopic(final String topic) {
             this.topic = topic;
             return asBuilder();
@@ -89,6 +94,11 @@
             this.syncSend = syncSend;
             return asBuilder();
         }
+
+        public B setSendTimestamp(boolean sendTimestamp) {
+            this.sendTimestamp = sendTimestamp;
+            return asBuilder();
+        }
     }
 
     /**
@@ -125,8 +135,11 @@
     private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException {
         final Layout<? extends Serializable> layout = getLayout();
         byte[] data;
+        Long eventTimestamp;
+
         data = layout.toByteArray(event);
-        manager.send(data);
+        eventTimestamp = event.getTimeMillis();
+        manager.send(data, eventTimestamp);
     }
 
     @Override
diff --git a/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java b/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java
index 2085cb3..4f82f07 100644
--- a/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java
+++ b/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java
@@ -51,6 +51,7 @@
     private final String topic;
     private final String key;
     private final boolean syncSend;
+    private final boolean sendTimestamp;
 
     private static final KafkaManagerFactory factory = new KafkaManagerFactory();
 
@@ -58,10 +59,11 @@
      * The Constructor should have been declared private as all Managers are create by the internal factory;
      */
     private KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final boolean syncSend,
-                        final Property[] properties, final String key) {
+                        final boolean sendTimestamp, final Property[] properties, final String key) {
         super(loggerContext, name);
         this.topic = Objects.requireNonNull(topic, "topic");
         this.syncSend = syncSend;
+        this.sendTimestamp = sendTimestamp;
         config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
         config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
         config.setProperty("batch.size", "0");
@@ -106,9 +108,10 @@
         }
     }
 
-    public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
+    public void send(final byte[] msg, final Long eventTimestamp) throws ExecutionException, InterruptedException, TimeoutException {
         if (producer != null) {
             byte[] newKey = null;
+            Long timestamp = null;
 
             if(key != null && key.contains("${")) {
                 newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);
@@ -116,7 +119,11 @@
                 newKey = key.getBytes(StandardCharsets.UTF_8);
             }
 
-            final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, newKey, msg);
+            if(sendTimestamp) {
+                timestamp = eventTimestamp;
+            }
+
+            final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, null, timestamp, newKey, msg);
             if (syncSend) {
                 final Future<RecordMetadata> response = producer.send(newRecord);
                 response.get(timeoutMillis, TimeUnit.MILLISECONDS);
@@ -142,26 +149,28 @@
     }
 
     public static KafkaManager getManager(final LoggerContext loggerContext, final String name, final String topic,
-            final boolean syncSend, final Property[] properties, final String key) {
+            final boolean syncSend, final boolean sendTimestamp, final Property[] properties, final String key) {
         StringBuilder sb = new StringBuilder(name);
         for (Property prop: properties) {
             sb.append(" ").append(prop.getName()).append("=").append(prop.getValue());
         }
-        return getManager(sb.toString(), factory, new FactoryData(loggerContext, topic, syncSend, properties, key));
+        return getManager(sb.toString(), factory, new FactoryData(loggerContext, topic, syncSend, sendTimestamp, properties, key));
     }
 
     private static class FactoryData {
         private final LoggerContext loggerContext;
         private final String topic;
         private final boolean syncSend;
+        private final boolean sendTimestamp;
         private final Property[] properties;
         private final String key;
 
         public FactoryData(final LoggerContext loggerContext, final String topic, final boolean syncSend,
-                final Property[] properties, final String key) {
+                final boolean sendTimestamp, final Property[] properties, final String key) {
             this.loggerContext = loggerContext;
             this.topic = topic;
             this.syncSend = syncSend;
+            this.sendTimestamp = sendTimestamp;
             this.properties = properties;
             this.key = key;
         }
@@ -171,7 +180,7 @@
     private static class KafkaManagerFactory implements ManagerFactory<KafkaManager, FactoryData> {
         @Override
         public KafkaManager createManager(String name, FactoryData data) {
-            return new KafkaManager(data.loggerContext, name, data.topic, data.syncSend, data.properties, data.key);
+            return new KafkaManager(data.loggerContext, name, data.topic, data.syncSend, data.sendTimestamp, data.properties, data.key);
         }
     }