Add support for LogEvent timestamp embedding in ProducerRecord
diff --git a/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaAppender.java b/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaAppender.java
index b9b5f5c..5f8e320 100644
--- a/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaAppender.java
+++ b/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaAppender.java
@@ -59,6 +59,9 @@
@PluginAttribute(value = "syncSend", defaultBoolean = true)
private boolean syncSend;
+ @PluginAttribute(value = "timestamp", defaultBoolean = false)
+ private boolean sendTimestamp;
+
@SuppressWarnings("resource")
@Override
public KafkaAppender build() {
@@ -68,7 +71,7 @@
return null;
}
final KafkaManager kafkaManager = KafkaManager.getManager(getConfiguration().getLoggerContext(),
- getName(), topic, syncSend, getPropertyArray(), key);
+ getName(), topic, syncSend, sendTimestamp, getPropertyArray(), key);
return new KafkaAppender(getName(), layout, getFilter(), isIgnoreExceptions(), getPropertyArray(), kafkaManager);
}
@@ -80,6 +83,8 @@
return syncSend;
}
+ public boolean isSendTimestamp() { return sendTimestamp; }
+
public B setTopic(final String topic) {
this.topic = topic;
return asBuilder();
@@ -89,6 +94,11 @@
this.syncSend = syncSend;
return asBuilder();
}
+
+ public B setSendTimestamp(boolean sendTimestamp) {
+ this.sendTimestamp = sendTimestamp;
+ return asBuilder();
+ }
}
/**
@@ -125,8 +135,11 @@
private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException {
final Layout<? extends Serializable> layout = getLayout();
byte[] data;
+ Long eventTimestamp;
+
data = layout.toByteArray(event);
- manager.send(data);
+ eventTimestamp = event.getTimeMillis();
+ manager.send(data, eventTimestamp);
}
@Override
diff --git a/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java b/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java
index 2085cb3..4f82f07 100644
--- a/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java
+++ b/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java
@@ -51,6 +51,7 @@
private final String topic;
private final String key;
private final boolean syncSend;
+ private final boolean sendTimestamp;
private static final KafkaManagerFactory factory = new KafkaManagerFactory();
@@ -58,10 +59,11 @@
* The Constructor should have been declared private as all Managers are create by the internal factory;
*/
private KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final boolean syncSend,
- final Property[] properties, final String key) {
+ final boolean sendTimestamp, final Property[] properties, final String key) {
super(loggerContext, name);
this.topic = Objects.requireNonNull(topic, "topic");
this.syncSend = syncSend;
+ this.sendTimestamp = sendTimestamp;
config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
config.setProperty("batch.size", "0");
@@ -106,9 +108,10 @@
}
}
- public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
+ public void send(final byte[] msg, final Long eventTimestamp) throws ExecutionException, InterruptedException, TimeoutException {
if (producer != null) {
byte[] newKey = null;
+ Long timestamp = null;
if(key != null && key.contains("${")) {
newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);
@@ -116,7 +119,11 @@
newKey = key.getBytes(StandardCharsets.UTF_8);
}
- final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, newKey, msg);
+ if(sendTimestamp) {
+ timestamp = eventTimestamp;
+ }
+
+ final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, null, timestamp, newKey, msg);
if (syncSend) {
final Future<RecordMetadata> response = producer.send(newRecord);
response.get(timeoutMillis, TimeUnit.MILLISECONDS);
@@ -142,26 +149,28 @@
}
public static KafkaManager getManager(final LoggerContext loggerContext, final String name, final String topic,
- final boolean syncSend, final Property[] properties, final String key) {
+ final boolean syncSend, final boolean sendTimestamp, final Property[] properties, final String key) {
StringBuilder sb = new StringBuilder(name);
for (Property prop: properties) {
sb.append(" ").append(prop.getName()).append("=").append(prop.getValue());
}
- return getManager(sb.toString(), factory, new FactoryData(loggerContext, topic, syncSend, properties, key));
+ return getManager(sb.toString(), factory, new FactoryData(loggerContext, topic, syncSend, sendTimestamp, properties, key));
}
private static class FactoryData {
private final LoggerContext loggerContext;
private final String topic;
private final boolean syncSend;
+ private final boolean sendTimestamp;
private final Property[] properties;
private final String key;
public FactoryData(final LoggerContext loggerContext, final String topic, final boolean syncSend,
- final Property[] properties, final String key) {
+ final boolean sendTimestamp, final Property[] properties, final String key) {
this.loggerContext = loggerContext;
this.topic = topic;
this.syncSend = syncSend;
+ this.sendTimestamp = sendTimestamp;
this.properties = properties;
this.key = key;
}
@@ -171,7 +180,7 @@
private static class KafkaManagerFactory implements ManagerFactory<KafkaManager, FactoryData> {
@Override
public KafkaManager createManager(String name, FactoryData data) {
- return new KafkaManager(data.loggerContext, name, data.topic, data.syncSend, data.properties, data.key);
+ return new KafkaManager(data.loggerContext, name, data.topic, data.syncSend, data.sendTimestamp, data.properties, data.key);
}
}