[FLINK-24283][connector/pulsar] Use stick key consumer in Key_Shared subscription. This would make sure Pulsar won't treat the flink reader as a shared consumer.

This fix https://github.com/apache/pulsar/pull/12035
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
index 4e92b2d..67cc3c7 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
@@ -26,6 +26,7 @@
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -33,9 +34,13 @@
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.KeySharedPolicy;
+import org.apache.pulsar.client.api.KeySharedPolicy.KeySharedPolicySticky;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -173,8 +178,7 @@
     }
 
     private void seekStartPosition(Set<TopicPartition> partitions) {
-        ConsumerBuilder<byte[]> consumerBuilder =
-                createConsumerBuilder(pulsarClient, Schema.BYTES, configuration);
+        ConsumerBuilder<byte[]> consumerBuilder = consumerBuilder();
         Set<String> seekedTopics = new HashSet<>();
 
         for (TopicPartition partition : partitions) {
@@ -200,6 +204,20 @@
         }
     }
 
+    private ConsumerBuilder<byte[]> consumerBuilder() {
+        ConsumerBuilder<byte[]> builder =
+                createConsumerBuilder(pulsarClient, Schema.BYTES, configuration);
+        if (sourceConfiguration.getSubscriptionType() == SubscriptionType.Key_Shared) {
+            Range range = TopicRange.createFullRange().toPulsarRange();
+            KeySharedPolicySticky keySharedPolicy = KeySharedPolicy.stickyHashRange().ranges(range);
+            // Force this consume use sticky hash range in Key_Shared subscription.
+            // Pulsar won't remove old message dispatcher before 2.8.2 release.
+            builder.keySharedPolicy(keySharedPolicy);
+        }
+
+        return builder;
+    }
+
     /**
      * Check if there's any partition changes within subscribed topic partitions fetched by worker
      * thread, and convert them to splits the assign them to pulsar readers.