fix broker_timestamp not set bug (#9493)

### Motivation
When configure broker.conf with 
`brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor` 
and run KOP with Pulsar Broker, we get the following exception:
```
16:44:20.374 [pulsar-msg-expiry-monitor-27-1] WARN  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/__kafka/__consumer_offsets-partition-36] Error while getting the oldest mess
age
java.lang.IllegalStateException: Field 'broker_timestamp' is not set
        at org.apache.pulsar.common.api.proto.BrokerEntryMetadata.getBrokerTimestamp(BrokerEntryMetadata.java:14) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at org.apache.pulsar.client.impl.MessageImpl.isExpired(MessageImpl.java:299) ~[org.apache.pulsar-pulsar-client-original-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.isOldestMessageExpired(PersistentTopic.java:2193) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at org.apache.pulsar.broker.service.persistent.PersistentSubscription.expireMessages(PersistentSubscription.java:891) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$checkMessageExpiry$34(PersistentTopic.java:1245) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at java.util.ArrayList.forEach(ArrayList.java:1257) ~[?:1.8.0_172]
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.checkMessageExpiry(PersistentTopic.java:1245) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_172]
        at org.apache.pulsar.broker.service.BrokerService.lambda$forEachTopic$51(BrokerService.java:1472) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:387) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at org.apache.pulsar.broker.service.BrokerService.forEachTopic(BrokerService.java:1470) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at org.apache.pulsar.broker.service.BrokerService.checkMessageExpiry(BrokerService.java:1411) ~[org.apache.pulsar-pulsar-broker-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.8.0-rc-202101252233.jar:2.8.0-rc-202101252233]
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.12.1.jar:4.12.1]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_172]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_172]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_172]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_172]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_172]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_172]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.51.Final.jar:4.1.51.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]
```

The reason is that it doesn't check whether `brokerTimestamp` has been set in brokerEntryMetadata, which is set by `
org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor`
```Java
public boolean isExpired(int messageTTLInSeconds) {
        return messageTTLInSeconds != 0 && (brokerEntryMetadata == null
                ? (System.currentTimeMillis() >
                    getPublishTime() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds))
                : (System.currentTimeMillis() >
                    brokerEntryMetadata.getBrokerTimestamp() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds)));
    }
```

### Modification
1. check whether brokerTimestamp has been set before call `getBrokerTimestamp` method.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 5f5d965..1243c76 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -292,7 +292,7 @@
     }
 
     public boolean isExpired(int messageTTLInSeconds) {
-        return messageTTLInSeconds != 0 && (brokerEntryMetadata == null
+        return messageTTLInSeconds != 0 && (brokerEntryMetadata == null || !brokerEntryMetadata.hasBrokerTimestamp()
                 ? (System.currentTimeMillis() >
                     getPublishTime() + TimeUnit.SECONDS.toMillis(messageTTLInSeconds))
                 : (System.currentTimeMillis() >
@@ -300,7 +300,7 @@
     }
 
     public boolean publishedEarlierThan(long timestamp) {
-        return brokerEntryMetadata == null ? getPublishTime() < timestamp
+        return brokerEntryMetadata == null || !brokerEntryMetadata.hasBrokerTimestamp() ? getPublishTime() < timestamp
                 : brokerEntryMetadata.getBrokerTimestamp() < timestamp;
     }
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
index 99ca4f9..0a57d93 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
@@ -18,7 +18,11 @@
  */
 package org.apache.pulsar.client.impl;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.Base64;
 
 import java.util.concurrent.CompletableFuture;
@@ -29,10 +33,14 @@
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils;
 import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.testng.Assert;
+import static org.testng.AssertJUnit.fail;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.any;
@@ -416,4 +424,63 @@
         MessageImpl<Boolean> msg = MessageImpl.create(builder, ByteBuffer.wrap(encodeBytes), BooleanSchema.of());
         assertNull(msg.getValue());
     }
+
+    @Test(timeOut = 30000)
+    public void testMessageBrokerAndEntryMetadataTimestampMissed() {
+        int MOCK_BATCH_SIZE = 10;
+        String data = "test-message";
+        ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
+        byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
+
+        try {
+            // test BrokerTimestamp not set.
+            MessageMetadata messageMetadata = new MessageMetadata()
+                    .setPublishTime(1)
+                    .setProducerName("test")
+                    .setSequenceId(1);
+            byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);
+            BrokerEntryMetadata brokerMetadata = new BrokerEntryMetadata()
+                            .setIndex(MOCK_BATCH_SIZE - 1);
+
+            int brokerMetaSize = brokerMetadata.getSerializedSize();
+            ByteBuf  brokerMeta = PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6);
+            brokerMeta.writeShort(Commands.magicBrokerEntryMetadata);
+            brokerMeta.writeInt(brokerMetaSize);
+            brokerMetadata.writeTo(brokerMeta);
+
+            CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+            compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
+            MessageImpl messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf);
+            MessageImpl message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf);
+            message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata());
+            assertTrue(message.isExpired(100));
+
+            // test BrokerTimestamp set.
+            byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
+            byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
+            messageMetadata = new MessageMetadata()
+                    .setPublishTime(System.currentTimeMillis())
+                    .setProducerName("test")
+                    .setSequenceId(1);
+            byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);
+            brokerMetadata = new BrokerEntryMetadata()
+                    .setBrokerTimestamp(System.currentTimeMillis())
+                    .setIndex(MOCK_BATCH_SIZE - 1);
+
+            brokerMetaSize = brokerMetadata.getSerializedSize();
+            brokerMeta = PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6);
+            brokerMeta.writeShort(Commands.magicBrokerEntryMetadata);
+            brokerMeta.writeInt(brokerMetaSize);
+            brokerMetadata.writeTo(brokerMeta);
+
+            compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+            compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
+            messageWithEntryMetadata = MessageImpl.deserializeBrokerEntryMetaDataFirst(compositeByteBuf);
+            message = MessageImpl.deserializeSkipBrokerEntryMetaData(compositeByteBuf);
+            message.setBrokerEntryMetadata(messageWithEntryMetadata.getBrokerEntryMetadata());
+            assertFalse(message.isExpired(24 * 3600));
+        } catch (IOException e) {
+            fail();
+        }
+    }
 }