Merge branch 'fedexist-kafka-timestamp'
diff --git a/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaAppender.java b/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaAppender.java
index 8f7de26..36ab5ec 100644
--- a/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaAppender.java
+++ b/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaAppender.java
@@ -59,6 +59,9 @@
@PluginAttribute(defaultBoolean = true)
private boolean syncSend;
+ @PluginAttribute(value = "eventTimestamp", defaultBoolean = true)
+ private boolean sendEventTimestamp;
+
@SuppressWarnings("resource")
@Override
public KafkaAppender build() {
@@ -68,7 +71,7 @@
return null;
}
final KafkaManager kafkaManager = KafkaManager.getManager(getConfiguration().getLoggerContext(),
- getName(), topic, syncSend, getPropertyArray(), key);
+ getName(), topic, syncSend, sendEventTimestamp, getPropertyArray(), key);
return new KafkaAppender(getName(), layout, getFilter(), isIgnoreExceptions(), getPropertyArray(), kafkaManager);
}
@@ -80,6 +83,10 @@
return syncSend;
}
+ public boolean isSendEventTimestamp() {
+ return sendEventTimestamp;
+ }
+
public B setTopic(final String topic) {
this.topic = topic;
return asBuilder();
@@ -94,6 +101,11 @@
this.syncSend = syncSend;
return asBuilder();
}
+
+ public B setSendEventTimestamp(boolean sendEventTimestamp) {
+ this.sendEventTimestamp = sendEventTimestamp;
+ return asBuilder();
+ }
}
/**
@@ -130,8 +142,11 @@
private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException {
final Layout<? extends Serializable> layout = getLayout();
byte[] data;
+ Long eventTimestamp;
+
data = layout.toByteArray(event);
- manager.send(data);
+ eventTimestamp = event.getTimeMillis();
+ manager.send(data, eventTimestamp);
}
@Override
diff --git a/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java b/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java
index 2085cb3..4f82f07 100644
--- a/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java
+++ b/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java
@@ -51,6 +51,7 @@
private final String topic;
private final String key;
private final boolean syncSend;
+ private final boolean sendTimestamp;
private static final KafkaManagerFactory factory = new KafkaManagerFactory();
@@ -58,10 +59,11 @@
* The Constructor should have been declared private as all Managers are create by the internal factory;
*/
private KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final boolean syncSend,
- final Property[] properties, final String key) {
+ final boolean sendTimestamp, final Property[] properties, final String key) {
super(loggerContext, name);
this.topic = Objects.requireNonNull(topic, "topic");
this.syncSend = syncSend;
+ this.sendTimestamp = sendTimestamp;
config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
config.setProperty("batch.size", "0");
@@ -106,9 +108,10 @@
}
}
- public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
+ public void send(final byte[] msg, final Long eventTimestamp) throws ExecutionException, InterruptedException, TimeoutException {
if (producer != null) {
byte[] newKey = null;
+ Long timestamp = null;
if(key != null && key.contains("${")) {
newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);
@@ -116,7 +119,11 @@
newKey = key.getBytes(StandardCharsets.UTF_8);
}
- final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, newKey, msg);
+ if(sendTimestamp) {
+ timestamp = eventTimestamp;
+ }
+
+ final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, null, timestamp, newKey, msg);
if (syncSend) {
final Future<RecordMetadata> response = producer.send(newRecord);
response.get(timeoutMillis, TimeUnit.MILLISECONDS);
@@ -142,26 +149,28 @@
}
public static KafkaManager getManager(final LoggerContext loggerContext, final String name, final String topic,
- final boolean syncSend, final Property[] properties, final String key) {
+ final boolean syncSend, final boolean sendTimestamp, final Property[] properties, final String key) {
StringBuilder sb = new StringBuilder(name);
for (Property prop: properties) {
sb.append(" ").append(prop.getName()).append("=").append(prop.getValue());
}
- return getManager(sb.toString(), factory, new FactoryData(loggerContext, topic, syncSend, properties, key));
+ return getManager(sb.toString(), factory, new FactoryData(loggerContext, topic, syncSend, sendTimestamp, properties, key));
}
private static class FactoryData {
private final LoggerContext loggerContext;
private final String topic;
private final boolean syncSend;
+ private final boolean sendTimestamp;
private final Property[] properties;
private final String key;
public FactoryData(final LoggerContext loggerContext, final String topic, final boolean syncSend,
- final Property[] properties, final String key) {
+ final boolean sendTimestamp, final Property[] properties, final String key) {
this.loggerContext = loggerContext;
this.topic = topic;
this.syncSend = syncSend;
+ this.sendTimestamp = sendTimestamp;
this.properties = properties;
this.key = key;
}
@@ -171,7 +180,7 @@
private static class KafkaManagerFactory implements ManagerFactory<KafkaManager, FactoryData> {
@Override
public KafkaManager createManager(String name, FactoryData data) {
- return new KafkaManager(data.loggerContext, name, data.topic, data.syncSend, data.properties, data.key);
+ return new KafkaManager(data.loggerContext, name, data.topic, data.syncSend, data.sendTimestamp, data.properties, data.key);
}
}
diff --git a/log4j-kafka/src/test/java/org/apache/logging/log4j/kafka/appender/KafkaAppenderTest.java b/log4j-kafka/src/test/java/org/apache/logging/log4j/kafka/appender/KafkaAppenderTest.java
index a4da443..f726fd4 100644
--- a/log4j-kafka/src/test/java/org/apache/logging/log4j/kafka/appender/KafkaAppenderTest.java
+++ b/log4j-kafka/src/test/java/org/apache/logging/log4j/kafka/appender/KafkaAppenderTest.java
@@ -126,6 +126,7 @@
assertNotNull(item);
assertEquals(TOPIC_NAME, item.topic());
byte[] keyValue = "key".getBytes(StandardCharsets.UTF_8);
+ assertEquals(Long.valueOf(logEvent.getTimeMillis()), item.timestamp());
assertArrayEquals(item.key(), keyValue);
assertEquals(LOG_MESSAGE, new String(item.value(), StandardCharsets.UTF_8));
}
@@ -143,11 +144,28 @@
assertNotNull(item);
assertEquals(TOPIC_NAME, item.topic());
byte[] keyValue = format.format(date).getBytes(StandardCharsets.UTF_8);
+ assertEquals(Long.valueOf(logEvent.getTimeMillis()), item.timestamp());
assertArrayEquals(item.key(), keyValue);
assertEquals(LOG_MESSAGE, new String(item.value(), StandardCharsets.UTF_8));
}
+ @Test
+ public void testAppenderNoEventTimestamp() throws Exception {
+ final Appender appender = ctx.getRequiredAppender("KafkaAppenderNoEventTimestamp");
+ final LogEvent logEvent = createLogEvent();
+ appender.append(logEvent);
+ final List<ProducerRecord<byte[], byte[]>> history = kafka.history();
+ assertEquals(1, history.size());
+ final ProducerRecord<byte[], byte[]> item = history.get(0);
+ assertNotNull(item);
+ assertEquals(TOPIC_NAME, item.topic());
+ byte[] keyValue = "key".getBytes(StandardCharsets.UTF_8);
+ assertArrayEquals(item.key(), keyValue);
+ assertNotEquals(Long.valueOf(logEvent.getTimeMillis()), item.timestamp());
+ assertEquals(LOG_MESSAGE, new String(item.value(), StandardCharsets.UTF_8));
+ }
+
private LogEvent deserializeLogEvent(final byte[] data) throws IOException, ClassNotFoundException {
final ByteArrayInputStream bis = new ByteArrayInputStream(data);
try (ObjectInput ois = new FilteredObjectInputStream(bis)) {
diff --git a/log4j-core/src/test/resources/KafkaAppenderCloseTimeoutTest.xml b/log4j-kafka/src/test/resources/KafkaAppenderCloseTimeoutTest.xml
similarity index 100%
rename from log4j-core/src/test/resources/KafkaAppenderCloseTimeoutTest.xml
rename to log4j-kafka/src/test/resources/KafkaAppenderCloseTimeoutTest.xml
diff --git a/log4j-core/src/test/resources/KafkaAppenderTest.xml b/log4j-kafka/src/test/resources/KafkaAppenderTest.xml
similarity index 86%
rename from log4j-core/src/test/resources/KafkaAppenderTest.xml
rename to log4j-kafka/src/test/resources/KafkaAppenderTest.xml
index 16768c8..725fb9b 100644
--- a/log4j-core/src/test/resources/KafkaAppenderTest.xml
+++ b/log4j-kafka/src/test/resources/KafkaAppenderTest.xml
@@ -37,12 +37,18 @@
<Property name="timeout.ms">1000</Property>
<Property name="bootstrap.servers">localhost:9092</Property>
</Kafka>
+ <Kafka name="KafkaAppenderNoEventTimestamp" topic="kafka-topic" key="key" eventTimestamp="false">
+ <PatternLayout pattern="%m"/>
+ <Property name="timeout.ms">1000</Property>
+ <Property name="bootstrap.servers">localhost:9092</Property>
+ </Kafka>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="KafkaAppenderWithLayout"/>
<AppenderRef ref="AsyncKafkaAppender"/>
<AppenderRef ref="KafkaAppenderWithKey"/>
+ <AppenderRef ref="KafkaAppenderNoEventTimestamp"/>
</Root>
</Loggers>
</Configuration>
\ No newline at end of file
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index f9bf359..2f6fdbb 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -31,6 +31,9 @@
- "remove" - Removed
-->
<release version="3.0.0" date="2019-xx-xx" description="GA Release 3.0.0">
+ <action issue="LOG4J2-2678" dev="rgoers" type="update" due-to="Federico D'Ambrosio">
+ Add LogEvent timestamp to ProducerRecord in KafkaAppender.
+ </action>
<actino issue="LOG4J2-2688" dev="rgoers" type="add" due-to="Romain Manni-Bucau">
Allow web lookup of session attributes.
</actino>