ISSUE-5934: Support read/write properties from/to Message in flink pulsar consumer/producer (#5955)

Fix #5934

Motivation
Support read/write properties from/to Message in flink pulsar consumer/producer, and you can override it in your derived class

Modifications

1. modify `PulsarConsumerSource.deserialize` access right from 'private' to 'protected'.
2. add method `protected Map<String, String> generateProperties(T value)` in class `FlinkPulsarProducer`, and invoked in `TypedMessageBuilder.properties()` to add it in pulsar Message.

* fix unit test failure

Co-authored-by: herodu <herodu@tencent.com>
Co-authored-by: Sijie Guo <sijie@apache.org>
Co-authored-by: duli <554979476@163.com>
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index 376439d..cf3c657 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -22,6 +22,7 @@
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 import java.util.function.Function;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -33,6 +34,7 @@
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
 import org.apache.flink.util.SerializableObject;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.MessageId;
@@ -68,6 +70,11 @@
     protected final PulsarKeyExtractor<T> flinkPulsarKeyExtractor;
 
     /**
+     * User-provided properties extractor for assigning a key to a pulsar message.
+     */
+    protected final PulsarPropertiesExtractor<T> flinkPulsarPropertiesExtractor;
+
+    /**
      * Produce Mode.
      */
     protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONCE;
@@ -110,7 +117,8 @@
                                String defaultTopicName,
                                Authentication authentication,
                                SerializationSchema<T> serializationSchema,
-                               PulsarKeyExtractor<T> keyExtractor) {
+                               PulsarKeyExtractor<T> keyExtractor,
+                               PulsarPropertiesExtractor<T> propertiesExtractor) {
         checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot be blank");
         checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank");
         checkNotNull(authentication, "auth cannot be null, set disabled for no auth");
@@ -123,17 +131,20 @@
         this.producerConf.setTopicName(defaultTopicName);
         this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
         this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
+        this.flinkPulsarPropertiesExtractor = getOrNullPropertiesExtractor(propertiesExtractor);
         ClosureCleaner.ensureSerializable(serializationSchema);
     }
 
     public FlinkPulsarProducer(ClientConfigurationData clientConfigurationData,
                                ProducerConfigurationData producerConfigurationData,
                                SerializationSchema<T> serializationSchema,
-                               PulsarKeyExtractor<T> keyExtractor) {
+                               PulsarKeyExtractor<T> keyExtractor,
+                               PulsarPropertiesExtractor<T> propertiesExtractor) {
         this.clientConf = checkNotNull(clientConfigurationData, "client conf can not be null");
         this.producerConf = checkNotNull(producerConfigurationData, "producer conf can not be null");
         this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
         this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
+        this.flinkPulsarPropertiesExtractor = getOrNullPropertiesExtractor(propertiesExtractor);
         ClosureCleaner.ensureSerializable(serializationSchema);
     }
 
@@ -148,6 +159,13 @@
     }
 
     /**
+     * @return pulsar properties extractor.
+     */
+    public PulsarPropertiesExtractor<T> getPulsarPropertiesExtractor() {
+        return flinkPulsarPropertiesExtractor;
+    }
+
+    /**
      * Gets this producer's operating mode.
      */
     public PulsarProduceMode getProduceMode() {
@@ -185,6 +203,16 @@
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private static <T> PulsarPropertiesExtractor<T> getOrNullPropertiesExtractor(
+            PulsarPropertiesExtractor<T> extractor) {
+        if (null == extractor) {
+            return PulsarPropertiesExtractor.EMPTY;
+        } else {
+            return extractor;
+        }
+    }
+
     private Producer<byte[]> createProducer() throws Exception {
         PulsarClientImpl client = CachedPulsarClient.getOrCreate(clientConf);
         return client.createProducerAsync(producerConf).get();
@@ -257,6 +285,7 @@
             }
         }
         msgBuilder.value(serializedValue)
+                .properties(this.flinkPulsarPropertiesExtractor.getProperties(value))
                 .sendAsync()
                 .thenApply(successCallback)
                 .exceptionally(failureCallback);
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
index d4602d9..51b3572 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
@@ -34,6 +34,7 @@
 import org.apache.flink.formats.avro.AvroRowSerializationSchema;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
@@ -53,6 +54,7 @@
     protected String[] fieldNames;
     protected TypeInformation[] fieldTypes;
     protected PulsarKeyExtractor<Row> keyExtractor;
+    protected PulsarPropertiesExtractor<Row> propertiesExtractor;
     private Class<? extends SpecificRecord> recordClazz;
 
     /**
@@ -106,7 +108,8 @@
                 clientConfigurationData,
                 producerConfigurationData,
                 serializationSchema,
-                keyExtractor);
+                keyExtractor,
+                propertiesExtractor);
     }
 
     @Override
@@ -151,6 +154,7 @@
                 fieldNames,
                 fieldTypes,
                 recordClazz);
+        sink.propertiesExtractor = PulsarPropertiesExtractor.EMPTY;
 
         return sink;
     }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 9606dfc..9570f17 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -164,7 +164,7 @@
         }
     }
 
-    private T deserialize(Message message) throws IOException {
+    protected T deserialize(Message message) throws IOException {
         return deserializer.deserialize(message.getData());
     }
 
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
index b89b60b..6d7479c 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
@@ -30,6 +30,7 @@
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
@@ -46,6 +47,7 @@
     protected ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
     protected SerializationSchema<Row> serializationSchema;
     protected PulsarKeyExtractor<Row> keyExtractor;
+    protected PulsarPropertiesExtractor<Row> propertiesExtractor;
     protected String[] fieldNames;
     protected TypeInformation[] fieldTypes;
     protected final String routingKeyFieldName;
@@ -95,7 +97,8 @@
             clientConfigurationData,
             producerConfigurationData,
             serializationSchema,
-            keyExtractor);
+            keyExtractor,
+            propertiesExtractor);
     }
 
     @Override
@@ -141,6 +144,7 @@
                 routingKeyFieldName,
                 fieldNames,
                 fieldTypes);
+        sink.propertiesExtractor = PulsarPropertiesExtractor.EMPTY;
 
         return sink;
     }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarPropertiesExtractor.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarPropertiesExtractor.java
new file mode 100644
index 0000000..67e86f3
--- /dev/null
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarPropertiesExtractor.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar.partitioner;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Extract message properties from a value or others.
+ */
+public interface PulsarPropertiesExtractor<T> extends Serializable {
+
+    PulsarPropertiesExtractor EMPTY = in -> Collections.emptyMap();
+
+    /**
+     * Retrieve properties from the value or others.
+     *
+     * @param in the value to extract a key.
+     * @return key.
+     */
+    Map<String, String> getProperties(T in);
+
+}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
index 83a7549..49b50bb 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
@@ -24,6 +24,7 @@
 import org.apache.flink.avro.generated.NasaMission;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
 import org.apache.pulsar.client.api.Authentication;
@@ -107,12 +108,14 @@
                 Mockito.anyString(),
                 Mockito.any(Authentication.class),
                 Mockito.any(SerializationSchema.class),
-                Mockito.any(PulsarKeyExtractor.class)
+                Mockito.any(PulsarKeyExtractor.class),
+                Mockito.any(PulsarPropertiesExtractor.class)
         ).thenReturn(producer);
         FieldUtils.writeField(sink, "fieldNames", fieldNames, true);
         FieldUtils.writeField(sink, "fieldTypes", typeInformations, true);
         FieldUtils.writeField(sink, "serializationSchema", Mockito.mock(SerializationSchema.class), true);
         FieldUtils.writeField(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class), true);
+        FieldUtils.writeField(sink, "propertiesExtractor", Mockito.mock(PulsarPropertiesExtractor.class), true);
         return sink;
     }
 
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
index 02462b3..d895a5f 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
@@ -23,6 +23,7 @@
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
 import org.apache.pulsar.client.api.Authentication;
@@ -103,13 +104,15 @@
                 Mockito.anyString(),
                 Mockito.any(Authentication.class),
                 Mockito.any(SerializationSchema.class),
-                Mockito.any(PulsarKeyExtractor.class)
+                Mockito.any(PulsarKeyExtractor.class),
+                Mockito.any(PulsarPropertiesExtractor.class)
         ).thenReturn(producer);
 
         FieldUtils.writeField(sink, "fieldNames", fieldNames, true);
         FieldUtils.writeField(sink, "fieldTypes", typeInformations, true);
         FieldUtils.writeField(sink, "serializationSchema", Mockito.mock(SerializationSchema.class), true);
         FieldUtils.writeField(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class), true);
+        FieldUtils.writeField(sink, "propertiesExtractor", Mockito.mock(PulsarPropertiesExtractor.class), true);
         return sink;
     }
 }