ISSUE-5934: Support read/write properties from/to Message in flink pulsar consumer/producer (#5955)
Fix #5934
Motivation
Support read/write properties from/to Message in flink pulsar consumer/producer, and you can override it in your derived class
Modifications
1. modify `PulsarConsumerSource.deserialize` access right from 'private' to 'protected'.
2. add method `protected Map<String, String> generateProperties(T value)` in class `FlinkPulsarProducer`, and invoked in `TypedMessageBuilder.properties()` to add it in pulsar Message.
* fix unit test failure
Co-authored-by: herodu <herodu@tencent.com>
Co-authored-by: Sijie Guo <sijie@apache.org>
Co-authored-by: duli <554979476@163.com>
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index 376439d..cf3c657 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -22,6 +22,7 @@
import static org.apache.flink.util.Preconditions.checkNotNull;
import java.util.function.Function;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -33,6 +34,7 @@
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
import org.apache.flink.util.SerializableObject;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.MessageId;
@@ -68,6 +70,11 @@
protected final PulsarKeyExtractor<T> flinkPulsarKeyExtractor;
/**
+ * User-provided properties extractor for assigning a key to a pulsar message.
+ */
+ protected final PulsarPropertiesExtractor<T> flinkPulsarPropertiesExtractor;
+
+ /**
* Produce Mode.
*/
protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONCE;
@@ -110,7 +117,8 @@
String defaultTopicName,
Authentication authentication,
SerializationSchema<T> serializationSchema,
- PulsarKeyExtractor<T> keyExtractor) {
+ PulsarKeyExtractor<T> keyExtractor,
+ PulsarPropertiesExtractor<T> propertiesExtractor) {
checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot be blank");
checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank");
checkNotNull(authentication, "auth cannot be null, set disabled for no auth");
@@ -123,17 +131,20 @@
this.producerConf.setTopicName(defaultTopicName);
this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
+ this.flinkPulsarPropertiesExtractor = getOrNullPropertiesExtractor(propertiesExtractor);
ClosureCleaner.ensureSerializable(serializationSchema);
}
public FlinkPulsarProducer(ClientConfigurationData clientConfigurationData,
ProducerConfigurationData producerConfigurationData,
SerializationSchema<T> serializationSchema,
- PulsarKeyExtractor<T> keyExtractor) {
+ PulsarKeyExtractor<T> keyExtractor,
+ PulsarPropertiesExtractor<T> propertiesExtractor) {
this.clientConf = checkNotNull(clientConfigurationData, "client conf can not be null");
this.producerConf = checkNotNull(producerConfigurationData, "producer conf can not be null");
this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
+ this.flinkPulsarPropertiesExtractor = getOrNullPropertiesExtractor(propertiesExtractor);
ClosureCleaner.ensureSerializable(serializationSchema);
}
@@ -148,6 +159,13 @@
}
/**
+ * @return pulsar properties extractor.
+ */
+ public PulsarPropertiesExtractor<T> getPulsarPropertiesExtractor() {
+ return flinkPulsarPropertiesExtractor;
+ }
+
+ /**
* Gets this producer's operating mode.
*/
public PulsarProduceMode getProduceMode() {
@@ -185,6 +203,16 @@
}
}
+ @SuppressWarnings("unchecked")
+ private static <T> PulsarPropertiesExtractor<T> getOrNullPropertiesExtractor(
+ PulsarPropertiesExtractor<T> extractor) {
+ if (null == extractor) {
+ return PulsarPropertiesExtractor.EMPTY;
+ } else {
+ return extractor;
+ }
+ }
+
private Producer<byte[]> createProducer() throws Exception {
PulsarClientImpl client = CachedPulsarClient.getOrCreate(clientConf);
return client.createProducerAsync(producerConf).get();
@@ -257,6 +285,7 @@
}
}
msgBuilder.value(serializedValue)
+ .properties(this.flinkPulsarPropertiesExtractor.getProperties(value))
.sendAsync()
.thenApply(successCallback)
.exceptionally(failureCallback);
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
index d4602d9..51b3572 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
@@ -34,6 +34,7 @@
import org.apache.flink.formats.avro.AvroRowSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
@@ -53,6 +54,7 @@
protected String[] fieldNames;
protected TypeInformation[] fieldTypes;
protected PulsarKeyExtractor<Row> keyExtractor;
+ protected PulsarPropertiesExtractor<Row> propertiesExtractor;
private Class<? extends SpecificRecord> recordClazz;
/**
@@ -106,7 +108,8 @@
clientConfigurationData,
producerConfigurationData,
serializationSchema,
- keyExtractor);
+ keyExtractor,
+ propertiesExtractor);
}
@Override
@@ -151,6 +154,7 @@
fieldNames,
fieldTypes,
recordClazz);
+ sink.propertiesExtractor = PulsarPropertiesExtractor.EMPTY;
return sink;
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 9606dfc..9570f17 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -164,7 +164,7 @@
}
}
- private T deserialize(Message message) throws IOException {
+ protected T deserialize(Message message) throws IOException {
return deserializer.deserialize(message.getData());
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
index b89b60b..6d7479c 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
@@ -30,6 +30,7 @@
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
@@ -46,6 +47,7 @@
protected ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
protected SerializationSchema<Row> serializationSchema;
protected PulsarKeyExtractor<Row> keyExtractor;
+ protected PulsarPropertiesExtractor<Row> propertiesExtractor;
protected String[] fieldNames;
protected TypeInformation[] fieldTypes;
protected final String routingKeyFieldName;
@@ -95,7 +97,8 @@
clientConfigurationData,
producerConfigurationData,
serializationSchema,
- keyExtractor);
+ keyExtractor,
+ propertiesExtractor);
}
@Override
@@ -141,6 +144,7 @@
routingKeyFieldName,
fieldNames,
fieldTypes);
+ sink.propertiesExtractor = PulsarPropertiesExtractor.EMPTY;
return sink;
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarPropertiesExtractor.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarPropertiesExtractor.java
new file mode 100644
index 0000000..67e86f3
--- /dev/null
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarPropertiesExtractor.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar.partitioner;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Extract message properties from a value or others.
+ */
+public interface PulsarPropertiesExtractor<T> extends Serializable {
+
+ PulsarPropertiesExtractor EMPTY = in -> Collections.emptyMap();
+
+ /**
+ * Retrieve properties from the value or others.
+ *
+ * @param in the value to extract a key.
+ * @return key.
+ */
+ Map<String, String> getProperties(T in);
+
+}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
index 83a7549..49b50bb 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
@@ -24,6 +24,7 @@
import org.apache.flink.avro.generated.NasaMission;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import org.apache.pulsar.client.api.Authentication;
@@ -107,12 +108,14 @@
Mockito.anyString(),
Mockito.any(Authentication.class),
Mockito.any(SerializationSchema.class),
- Mockito.any(PulsarKeyExtractor.class)
+ Mockito.any(PulsarKeyExtractor.class),
+ Mockito.any(PulsarPropertiesExtractor.class)
).thenReturn(producer);
FieldUtils.writeField(sink, "fieldNames", fieldNames, true);
FieldUtils.writeField(sink, "fieldTypes", typeInformations, true);
FieldUtils.writeField(sink, "serializationSchema", Mockito.mock(SerializationSchema.class), true);
FieldUtils.writeField(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class), true);
+ FieldUtils.writeField(sink, "propertiesExtractor", Mockito.mock(PulsarPropertiesExtractor.class), true);
return sink;
}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
index 02462b3..d895a5f 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
@@ -23,6 +23,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import org.apache.pulsar.client.api.Authentication;
@@ -103,13 +104,15 @@
Mockito.anyString(),
Mockito.any(Authentication.class),
Mockito.any(SerializationSchema.class),
- Mockito.any(PulsarKeyExtractor.class)
+ Mockito.any(PulsarKeyExtractor.class),
+ Mockito.any(PulsarPropertiesExtractor.class)
).thenReturn(producer);
FieldUtils.writeField(sink, "fieldNames", fieldNames, true);
FieldUtils.writeField(sink, "fieldTypes", typeInformations, true);
FieldUtils.writeField(sink, "serializationSchema", Mockito.mock(SerializationSchema.class), true);
FieldUtils.writeField(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class), true);
+ FieldUtils.writeField(sink, "propertiesExtractor", Mockito.mock(PulsarPropertiesExtractor.class), true);
return sink;
}
}