[FLINK-24283][connector/pulsar] Use stick key consumer in Key_Shared subscription. This would make sure Pulsar won't treat the flink reader as a shared consumer.
This fix https://github.com/apache/pulsar/pull/12035
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
index 4e92b2d..67cc3c7 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
@@ -26,6 +26,7 @@
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.util.FlinkRuntimeException;
@@ -33,9 +34,13 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.KeySharedPolicy;
+import org.apache.pulsar.client.api.KeySharedPolicy.KeySharedPolicySticky;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -173,8 +178,7 @@
}
private void seekStartPosition(Set<TopicPartition> partitions) {
- ConsumerBuilder<byte[]> consumerBuilder =
- createConsumerBuilder(pulsarClient, Schema.BYTES, configuration);
+ ConsumerBuilder<byte[]> consumerBuilder = consumerBuilder();
Set<String> seekedTopics = new HashSet<>();
for (TopicPartition partition : partitions) {
@@ -200,6 +204,20 @@
}
}
+ private ConsumerBuilder<byte[]> consumerBuilder() {
+ ConsumerBuilder<byte[]> builder =
+ createConsumerBuilder(pulsarClient, Schema.BYTES, configuration);
+ if (sourceConfiguration.getSubscriptionType() == SubscriptionType.Key_Shared) {
+ Range range = TopicRange.createFullRange().toPulsarRange();
+ KeySharedPolicySticky keySharedPolicy = KeySharedPolicy.stickyHashRange().ranges(range);
+ // Force this consume use sticky hash range in Key_Shared subscription.
+ // Pulsar won't remove old message dispatcher before 2.8.2 release.
+ builder.keySharedPolicy(keySharedPolicy);
+ }
+
+ return builder;
+ }
+
/**
* Check if there's any partition changes within subscribed topic partitions fetched by worker
* thread, and convert them to splits the assign them to pulsar readers.