[Issue 4803][client] return null if the message value/data is not set by producer (#6379)

Fixes #4803 

### Motivation
Allow the typed consumer receive messages with `null` value if the producer sends message without payload.

### Modifications
- add a flag in `MessageMetadata` to indicate if the payload is set when the message is created
- check and return `null` if the flag is not set when reading data from a message
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
index cd39d96..99962d8 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
@@ -39,6 +39,7 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -611,7 +612,7 @@
 
     private static Message<byte[]> createMessage(String content, String messageId) {
         return new MessageImpl<byte[]>("my-topic", messageId, Collections.emptyMap(),
-                                       content.getBytes(), Schema.BYTES);
+                                       content.getBytes(), Schema.BYTES, PulsarApi.MessageMetadata.newBuilder());
     }
 
     private static String createMessageId(long ledgerId, long entryId, long partitionIndex) {
diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
index ba67051..7f93d47 100644
--- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -45,6 +45,7 @@
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.storm.PulsarSpout.SpoutConsumer;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -80,7 +81,8 @@
         ClientBuilder builder = spy(new ClientBuilderImpl());
         PulsarSpout spout = spy(new PulsarSpout(conf, builder));
 
-        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), new byte[0], Schema.BYTES);
+        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(),
+                new byte[0], Schema.BYTES, PulsarApi.MessageMetadata.newBuilder());
         Consumer<byte[]> consumer = mock(Consumer.class);
         SpoutConsumer spoutConsumer = new SpoutConsumer(consumer);
         CompletableFuture<Void> future = new CompletableFuture<>();
@@ -154,7 +156,8 @@
         when(client.getSharedConsumer(any())).thenReturn(consumer);
         instances.put(componentId, client);
 
-        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), msgContent.getBytes(), Schema.BYTES);
+        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(),
+                msgContent.getBytes(), Schema.BYTES, PulsarApi.MessageMetadata.newBuilder());
         when(consumer.receive(anyInt(), any())).thenReturn(msg);
 
         spout.open(config, context, collector);