Merge branch 'fedexist-kafka-timestamp'
diff --git a/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaAppender.java b/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaAppender.java
index 8f7de26..36ab5ec 100644
--- a/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaAppender.java
+++ b/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaAppender.java
@@ -59,6 +59,9 @@
         @PluginAttribute(defaultBoolean = true)
         private boolean syncSend;
 
+        @PluginAttribute(value = "eventTimestamp", defaultBoolean = true)
+        private boolean sendEventTimestamp;
+
         @SuppressWarnings("resource")
         @Override
         public KafkaAppender build() {
@@ -68,7 +71,7 @@
                 return null;
             }
             final KafkaManager kafkaManager = KafkaManager.getManager(getConfiguration().getLoggerContext(),
-                    getName(), topic, syncSend, getPropertyArray(), key);
+                    getName(), topic, syncSend, sendEventTimestamp, getPropertyArray(), key);
             return new KafkaAppender(getName(), layout, getFilter(), isIgnoreExceptions(), getPropertyArray(), kafkaManager);
         }
 
@@ -80,6 +83,10 @@
             return syncSend;
         }
 
+        public boolean isSendEventTimestamp() {
+            return sendEventTimestamp;
+        }
+
         public B setTopic(final String topic) {
             this.topic = topic;
             return asBuilder();
@@ -94,6 +101,11 @@
             this.syncSend = syncSend;
             return asBuilder();
         }
+
+        public B setSendEventTimestamp(boolean sendEventTimestamp) {
+            this.sendEventTimestamp = sendEventTimestamp;
+            return asBuilder();
+        }
     }
 
     /**
@@ -130,8 +142,11 @@
     private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException {
         final Layout<? extends Serializable> layout = getLayout();
         byte[] data;
+        Long eventTimestamp;
+
         data = layout.toByteArray(event);
-        manager.send(data);
+        eventTimestamp = event.getTimeMillis();
+        manager.send(data, eventTimestamp);
     }
 
     @Override
diff --git a/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java b/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java
index 2085cb3..4f82f07 100644
--- a/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java
+++ b/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java
@@ -51,6 +51,7 @@
     private final String topic;
     private final String key;
     private final boolean syncSend;
+    private final boolean sendTimestamp;
 
     private static final KafkaManagerFactory factory = new KafkaManagerFactory();
 
@@ -58,10 +59,11 @@
      * The Constructor should have been declared private as all Managers are create by the internal factory;
      */
     private KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final boolean syncSend,
-                        final Property[] properties, final String key) {
+                        final boolean sendTimestamp, final Property[] properties, final String key) {
         super(loggerContext, name);
         this.topic = Objects.requireNonNull(topic, "topic");
         this.syncSend = syncSend;
+        this.sendTimestamp = sendTimestamp;
         config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
         config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
         config.setProperty("batch.size", "0");
@@ -106,9 +108,10 @@
         }
     }
 
-    public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
+    public void send(final byte[] msg, final Long eventTimestamp) throws ExecutionException, InterruptedException, TimeoutException {
         if (producer != null) {
             byte[] newKey = null;
+            Long timestamp = null;
 
             if(key != null && key.contains("${")) {
                 newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);
@@ -116,7 +119,11 @@
                 newKey = key.getBytes(StandardCharsets.UTF_8);
             }
 
-            final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, newKey, msg);
+            if(sendTimestamp) {
+                timestamp = eventTimestamp;
+            }
+
+            final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, null, timestamp, newKey, msg);
             if (syncSend) {
                 final Future<RecordMetadata> response = producer.send(newRecord);
                 response.get(timeoutMillis, TimeUnit.MILLISECONDS);
@@ -142,26 +149,28 @@
     }
 
     public static KafkaManager getManager(final LoggerContext loggerContext, final String name, final String topic,
-            final boolean syncSend, final Property[] properties, final String key) {
+            final boolean syncSend, final boolean sendTimestamp, final Property[] properties, final String key) {
         StringBuilder sb = new StringBuilder(name);
         for (Property prop: properties) {
             sb.append(" ").append(prop.getName()).append("=").append(prop.getValue());
         }
-        return getManager(sb.toString(), factory, new FactoryData(loggerContext, topic, syncSend, properties, key));
+        return getManager(sb.toString(), factory, new FactoryData(loggerContext, topic, syncSend, sendTimestamp, properties, key));
     }
 
     private static class FactoryData {
         private final LoggerContext loggerContext;
         private final String topic;
         private final boolean syncSend;
+        private final boolean sendTimestamp;
         private final Property[] properties;
         private final String key;
 
         public FactoryData(final LoggerContext loggerContext, final String topic, final boolean syncSend,
-                final Property[] properties, final String key) {
+                final boolean sendTimestamp, final Property[] properties, final String key) {
             this.loggerContext = loggerContext;
             this.topic = topic;
             this.syncSend = syncSend;
+            this.sendTimestamp = sendTimestamp;
             this.properties = properties;
             this.key = key;
         }
@@ -171,7 +180,7 @@
     private static class KafkaManagerFactory implements ManagerFactory<KafkaManager, FactoryData> {
         @Override
         public KafkaManager createManager(String name, FactoryData data) {
-            return new KafkaManager(data.loggerContext, name, data.topic, data.syncSend, data.properties, data.key);
+            return new KafkaManager(data.loggerContext, name, data.topic, data.syncSend, data.sendTimestamp, data.properties, data.key);
         }
     }
 
diff --git a/log4j-kafka/src/test/java/org/apache/logging/log4j/kafka/appender/KafkaAppenderTest.java b/log4j-kafka/src/test/java/org/apache/logging/log4j/kafka/appender/KafkaAppenderTest.java
index a4da443..f726fd4 100644
--- a/log4j-kafka/src/test/java/org/apache/logging/log4j/kafka/appender/KafkaAppenderTest.java
+++ b/log4j-kafka/src/test/java/org/apache/logging/log4j/kafka/appender/KafkaAppenderTest.java
@@ -126,6 +126,7 @@
         assertNotNull(item);
         assertEquals(TOPIC_NAME, item.topic());
         byte[] keyValue = "key".getBytes(StandardCharsets.UTF_8);
+        assertEquals(Long.valueOf(logEvent.getTimeMillis()), item.timestamp());
         assertArrayEquals(item.key(), keyValue);
         assertEquals(LOG_MESSAGE, new String(item.value(), StandardCharsets.UTF_8));
     }
@@ -143,11 +144,28 @@
         assertNotNull(item);
         assertEquals(TOPIC_NAME, item.topic());
         byte[] keyValue = format.format(date).getBytes(StandardCharsets.UTF_8);
+        assertEquals(Long.valueOf(logEvent.getTimeMillis()), item.timestamp());
         assertArrayEquals(item.key(), keyValue);
         assertEquals(LOG_MESSAGE, new String(item.value(), StandardCharsets.UTF_8));
     }
 
 
+    @Test
+    public void testAppenderNoEventTimestamp() throws Exception {
+        final Appender appender = ctx.getRequiredAppender("KafkaAppenderNoEventTimestamp");
+        final LogEvent logEvent = createLogEvent();
+        appender.append(logEvent);
+        final List<ProducerRecord<byte[], byte[]>> history = kafka.history();
+        assertEquals(1, history.size());
+        final ProducerRecord<byte[], byte[]> item = history.get(0);
+        assertNotNull(item);
+        assertEquals(TOPIC_NAME, item.topic());
+        byte[] keyValue = "key".getBytes(StandardCharsets.UTF_8);
+        assertArrayEquals(item.key(), keyValue);
+        assertNotEquals(Long.valueOf(logEvent.getTimeMillis()), item.timestamp());
+        assertEquals(LOG_MESSAGE, new String(item.value(), StandardCharsets.UTF_8));
+    }
+
     private LogEvent deserializeLogEvent(final byte[] data) throws IOException, ClassNotFoundException {
         final ByteArrayInputStream bis = new ByteArrayInputStream(data);
         try (ObjectInput ois = new FilteredObjectInputStream(bis)) {
diff --git a/log4j-core/src/test/resources/KafkaAppenderCloseTimeoutTest.xml b/log4j-kafka/src/test/resources/KafkaAppenderCloseTimeoutTest.xml
similarity index 100%
rename from log4j-core/src/test/resources/KafkaAppenderCloseTimeoutTest.xml
rename to log4j-kafka/src/test/resources/KafkaAppenderCloseTimeoutTest.xml
diff --git a/log4j-core/src/test/resources/KafkaAppenderTest.xml b/log4j-kafka/src/test/resources/KafkaAppenderTest.xml
similarity index 86%
rename from log4j-core/src/test/resources/KafkaAppenderTest.xml
rename to log4j-kafka/src/test/resources/KafkaAppenderTest.xml
index 16768c8..725fb9b 100644
--- a/log4j-core/src/test/resources/KafkaAppenderTest.xml
+++ b/log4j-kafka/src/test/resources/KafkaAppenderTest.xml
@@ -37,12 +37,18 @@
       <Property name="timeout.ms">1000</Property>
       <Property name="bootstrap.servers">localhost:9092</Property>
     </Kafka>
+    <Kafka name="KafkaAppenderNoEventTimestamp" topic="kafka-topic" key="key" eventTimestamp="false">
+      <PatternLayout pattern="%m"/>
+      <Property name="timeout.ms">1000</Property>
+      <Property name="bootstrap.servers">localhost:9092</Property>
+    </Kafka>
   </Appenders>
   <Loggers>
     <Root level="info">
       <AppenderRef ref="KafkaAppenderWithLayout"/>
       <AppenderRef ref="AsyncKafkaAppender"/>
       <AppenderRef ref="KafkaAppenderWithKey"/>
+      <AppenderRef ref="KafkaAppenderNoEventTimestamp"/>
     </Root>
   </Loggers>
 </Configuration>
\ No newline at end of file
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index f9bf359..2f6fdbb 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -31,6 +31,9 @@
          - "remove" - Removed
     -->
     <release version="3.0.0" date="2019-xx-xx" description="GA Release 3.0.0">
+      <action issue="LOG4J2-2678" dev="rgoers" type="update" due-to="Federico D'Ambrosio">
+        Add LogEvent timestamp to ProducerRecord in KafkaAppender.
+      </action>
       <actino issue="LOG4J2-2688" dev="rgoers" type="add" due-to="Romain Manni-Bucau">
         Allow web lookup of session attributes.
       </actino>