tree 93bfa39ad7fb173a0683eedd3268c80d2769013b
parent fb605a72ebc38e9a6e2154daedcacebfa8c5fc68
author hangc0276 <hangc0276@163.com> 1617198863 +0800
committer GitHub <noreply@github.com> 1617198863 +0800
gpgsig -----BEGIN PGP SIGNATURE-----
 
 wsBcBAABCAAQBQJgZH8PCRBK7hj4Ov3rIwAAdHIIABiWElR87LB48OZ6Or/Wd1mI
 yw3/OKsDnruyycpNpXP4ykfbTzkgaQBo80OzEpBNg/VhBbz4MIxYF8w9aceX6pSN
 M69Z+aqxYLAg83EyeN3HY76z6/ehitootTx3E0aGfc8/kUu266Q7XudT8JGPaelR
 GAgXzmSE26ctrLDrvcde96/EZLx87UBAN7Pw/IyHu5ihhlnz1gBXoh27i+i4r4D/
 Qtk0iRE5WRkbiEi5criP1eVy7q7zkbMIzJZqks/RRznA0YighgUq41gRylau3cgM
 af0AKSmf6L9tagjpXihC+y3wMGD6aN2H3LLswOk6n5ulOAyfh1VkSKftlHr7PuI=
 =RdnB
 -----END PGP SIGNATURE-----
 

fix broker_timestamp not set bug (#9493)

### Motivation
When configure broker.conf with 
`brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor` 
and run KOP with Pulsar Broker, we get the following exception:
```
16:44:20.374 [pulsar-msg-expiry-monitor-27-1] WARN  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/__kafka/__consumer_offsets-partition-36] Error while getting the oldest mess
age
java.lang.IllegalStateException: Field 'broker_timestamp' is not set
        at org.apache.pulsar.common.api.proto.BrokerEntryMetadata.getBrokerTimestamp(BrokerEntryMetadata.java:14) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at org.apache.pulsar.client.impl.MessageImpl.isExpired(MessageImpl.java:299) ~[org.apache.pulsar-pulsar-client-original-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.isOldestMessageExpired(PersistentTopic.java:2193) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.expireMessages(PersistentSubscription.java:891) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$checkMessageExpiry$34(PersistentTopic.java:1245) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at java.util.ArrayList.forEach(ArrayList.java:1257) ~[?:1.8.0_172]
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.checkMessageExpiry(PersistentTopic.java:1245) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_172]
        at org.apache.pulsar.broker.service.BrokerService.lambda$forEachTopic$51(BrokerService.java:1472) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:387) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at org.apache.pulsar.broker.service.BrokerService.forEachTopic(BrokerService.java:1470) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at org.apache.pulsar.broker.service.BrokerService.checkMessageExpiry(BrokerService.java:1411) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.12.1.jar:4.12.1]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_172]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_172]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_172]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_172]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_172]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_172]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.51.Final.jar:4.1.51.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]
```

The reason is that it doesn't check whether `brokerTimestamp` has been set in brokerEntryMetadata, which is set by `
org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor`
```Java
public boolean isExpired(int messageTTLInSeconds) {
        return messageTTLInSeconds != 0 && (brokerEntryMetadata == null
                ? (System.currentTimeMillis() >
                    getPublishTime() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds))
                : (System.currentTimeMillis() >
                    brokerEntryMetadata.getBrokerTimestamp() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds)));
    }
```

### Modification
1. check whether brokerTimestamp has been set before call `getBrokerTimestamp` method.