| # PIP-392: Add configuration to enable consistent hashing to select active consumer for partitioned topic |
| |
| # Background knowledge |
| |
| After [#19502](https://github.com/apache/pulsar/pull/19502) will use consistent hashing to select active consumer for non-partitioned topic |
| |
| # Motivation |
| |
| Currently, for partitioned topics, the active consumer is selected using the formula [partitionedIndex % consumerSize](https://github.com/apache/pulsar/blob/137df29f85798b00de75460a1acb91c7bc25453f/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L129-L130). |
| This method can lead to uneven distribution of active consumers. |
| |
| Consider a scenario with 100 topics named `public/default/topic-{0~100}`, each having `one partition`. |
| If 10 consumers are created using a `regex` subscription with the `Failover type`, all topic will be assigned to the same consumer(the first connected consumer). This results in an imbalanced distribution of consumers. |
| |
| # Goals |
| |
| ## In Scope |
| - Address the issue of imbalance for `failover` subscription type consumers in single-partition or few-partition topics. |
| |
| ## Out of Scope |
| - Excluding the `exclusive` subscription type. |
| |
| It's important to note that both the `modulo algorithm` and the `consistent hashing algorithm` can cause the consumer to be transferred. |
| This might result in messages being delivered multiple times to consumers, which is a known issue and has been mentioned in the documentation. |
| https://pulsar.apache.org/docs/4.0.x/concepts-messaging/#failover |
| |
| # High Level Design |
| The solution involves adding a configuration setting that allows users to enable consistent hashing for partitioned topics. |
| When enabled, the consumer selection process will use consistent hashing instead of the modulo operation. |
| |
| The algorithm already exists through [#19502](https://github.com/apache/pulsar/pull/19502) |
| |
| In simple terms, the hash algorithm includes the following steps: |
| |
| 1. Hash Ring Creation: Traverse all consumers and use `consumer name` to calculate a hash ring with 100 virtual nodes. |
| |
| [Exist code](https://github.com/apache/pulsar/blob/1b1bd4b610dd768a6908964ef841a6790bb0f4f0/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L152-L162) |
| ```java |
| private NavigableMap<Integer, Integer> makeHashRing(int consumerSize) { |
| NavigableMap<Integer, Integer> hashRing = new TreeMap<>(); |
| for (int i = 0; i < consumerSize; i++) { |
| for (int j = 0; j < CONSUMER_CONSISTENT_HASH_REPLICAS; j++) { |
| String key = consumers.get(i).consumerName() + j; |
| int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes()); |
| hashRing.put(hash, i); |
| } |
| } |
| return Collections.unmodifiableNavigableMap(hashRing); |
| } |
| ``` |
| |
| 2. Consumer Selection: Use the hash of the topic name to select the matching consumer from the hash ring. |
| |
| [Exist code](https://github.com/apache/pulsar/blob/1b1bd4b610dd768a6908964ef841a6790bb0f4f0/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L146-L150) |
| ```java |
| private int peekConsumerIndexFromHashRing(NavigableMap<Integer, Integer> hashRing) { |
| int hash = Murmur3Hash32.getInstance().makeHash(topicName); |
| Map.Entry<Integer, Integer> ceilingEntry = hashRing.ceilingEntry(hash); |
| return ceilingEntry != null ? ceilingEntry.getValue() : hashRing.firstEntry().getValue(); |
| } |
| ``` |
| |
| This approach ensures a more even distribution of active consumers across topics, improving load balancing and resource utilization. |
| |
| # Detailed Design |
| |
| ## Design & Implementation Details |
| Refer to implementation PR: https://github.com/apache/pulsar/pull/23584 |
| |
| The implementation is simple. If this activeConsumerFailoverConsistentHashing is enabled, the consistent hashing algorithm is used regardless of whether the topic is partitioned. |
| |
| ## Public-facing Changes |
| |
| If activeConsumerFailoverConsistentHashing is enabled, when users use the failover subscription model, |
| the `first consumer` will not necessarily consume `P1`, and the `second consumer` will not necessarily consume `P2`. |
| |
| As described in the documentation:: https://pulsar.apache.org/docs/4.0.x/concepts-messaging/#failover--partitioned-topics |
| |
| Instead, the hash algorithm will determine which consumer consumes which partition. |
| |
| ### Configuration |
| |
| A new configuration field will be added: |
| |
| ```java |
| @FieldContext( |
| category = CATEGORY_POLICIES, |
| doc = "Enable consistent hashing for selecting the active consumer in partitioned " |
| + "topics with Failover subscription type. " |
| + "For non-partitioned topics, consistent hashing is used by default." |
| ) |
| private boolean activeConsumerFailoverConsistentHashing = false; |
| ``` |
| |
| |
| # Backward & Forward Compatibility |
| The default value is false to keep original behavior. |