fix broker_timestamp not set bug (#9493)
### Motivation
When configure broker.conf with
`brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor`
and run KOP with Pulsar Broker, we get the following exception:
```
16:44:20.374 [pulsar-msg-expiry-monitor-27-1] WARN org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/__kafka/__consumer_offsets-partition-36] Error while getting the oldest mess
age
java.lang.IllegalStateException: Field 'broker_timestamp' is not set
at org.apache.pulsar.common.api.proto.BrokerEntryMetadata.getBrokerTimestamp(BrokerEntryMetadata.java:14) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at org.apache.pulsar.client.impl.MessageImpl.isExpired(MessageImpl.java:299) ~[org.apache.pulsar-pulsar-client-original-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.isOldestMessageExpired(PersistentTopic.java:2193) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at org.apache.pulsar.broker.service.persistent.PersistentSubscription.expireMessages(PersistentSubscription.java:891) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$checkMessageExpiry$34(PersistentTopic.java:1245) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at java.util.ArrayList.forEach(ArrayList.java:1257) ~[?:1.8.0_172]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.checkMessageExpiry(PersistentTopic.java:1245) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_172]
at org.apache.pulsar.broker.service.BrokerService.lambda$forEachTopic$51(BrokerService.java:1472) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:387) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at org.apache.pulsar.broker.service.BrokerService.forEachTopic(BrokerService.java:1470) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at org.apache.pulsar.broker.service.BrokerService.checkMessageExpiry(BrokerService.java:1411) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.12.1.jar:4.12.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_172]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_172]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_172]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_172]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_172]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_172]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.51.Final.jar:4.1.51.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]
```
The reason is that it doesn't check whether `brokerTimestamp` has been set in brokerEntryMetadata, which is set by `
org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor`
```Java
public boolean isExpired(int messageTTLInSeconds) {
return messageTTLInSeconds != 0 && (brokerEntryMetadata == null
? (System.currentTimeMillis() >
getPublishTime() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds))
: (System.currentTimeMillis() >
brokerEntryMetadata.getBrokerTimestamp() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds)));
}
```
### Modification
1. check whether brokerTimestamp has been set before call `getBrokerTimestamp` method.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 5f5d965..1243c76 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -292,7 +292,7 @@
}
public boolean isExpired(int messageTTLInSeconds) {
- return messageTTLInSeconds != 0 && (brokerEntryMetadata == null
+ return messageTTLInSeconds != 0 && (brokerEntryMetadata == null || !brokerEntryMetadata.hasBrokerTimestamp()
? (System.currentTimeMillis() >
getPublishTime() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds))
: (System.currentTimeMillis() >
@@ -300,7 +300,7 @@
}
public boolean publishedEarlierThan(long timestamp) {
- return brokerEntryMetadata == null ? getPublishTime() < timestamp
+ return brokerEntryMetadata == null || !brokerEntryMetadata.hasBrokerTimestamp() ? getPublishTime() < timestamp
: brokerEntryMetadata.getBrokerTimestamp() < timestamp;
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
index 99ca4f9..0a57d93 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
@@ -18,7 +18,11 @@
*/
package org.apache.pulsar.client.impl;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.concurrent.CompletableFuture;
@@ -29,10 +33,14 @@
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils;
import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.testng.Assert;
+import static org.testng.AssertJUnit.fail;
import org.testng.annotations.Test;
import static org.mockito.Mockito.any;
@@ -416,4 +424,63 @@
MessageImpl<Boolean> msg = MessageImpl.create(builder, ByteBuffer.wrap(encodeBytes), BooleanSchema.of());
assertNull(msg.getValue());
}
+
+ @Test(timeOut = 30000)
+ public void testMessageBrokerAndEntryMetadataTimestampMissed() {
+ int MOCK_BATCH_SIZE = 10;
+ String data = "test-message";
+ ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
+ byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
+
+ try {
+ // test BrokerTimestamp not set.
+ MessageMetadata messageMetadata = new MessageMetadata()
+ .setPublishTime(1)
+ .setProducerName("test")
+ .setSequenceId(1);
+ byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);
+ BrokerEntryMetadata brokerMetadata = new BrokerEntryMetadata()
+ .setIndex(MOCK_BATCH_SIZE - 1);
+
+ int brokerMetaSize = brokerMetadata.getSerializedSize();
+ ByteBuf brokerMeta = PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6);
+ brokerMeta.writeShort(Commands.magicBrokerEntryMetadata);
+ brokerMeta.writeInt(brokerMetaSize);
+ brokerMetadata.writeTo(brokerMeta);
+
+ CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+ compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
+ MessageImpl messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf);
+ MessageImpl message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf);
+ message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata());
+ assertTrue(message.isExpired(100));
+
+ // test BrokerTimestamp set.
+ byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
+ byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
+ messageMetadata = new MessageMetadata()
+ .setPublishTime(System.currentTimeMillis())
+ .setProducerName("test")
+ .setSequenceId(1);
+ byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);
+ brokerMetadata = new BrokerEntryMetadata()
+ .setBrokerTimestamp(System.currentTimeMillis())
+ .setIndex(MOCK_BATCH_SIZE - 1);
+
+ brokerMetaSize = brokerMetadata.getSerializedSize();
+ brokerMeta = PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6);
+ brokerMeta.writeShort(Commands.magicBrokerEntryMetadata);
+ brokerMeta.writeInt(brokerMetaSize);
+ brokerMetadata.writeTo(brokerMeta);
+
+ compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+ compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
+ messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf);
+ message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf);
+ message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata());
+ assertFalse(message.isExpired(24 * 3600));
+ } catch (IOException e) {
+ fail();
+ }
+ }
}