blob: d6c899be8041a7567e928fe026306355bae4e032 [file] [log] [blame] [view]
# PIP-392: Add configuration to enable consistent hashing to select active consumer for partitioned topic
# Background knowledge
After [#19502](https://github.com/apache/pulsar/pull/19502) will use consistent hashing to select active consumer for non-partitioned topic
# Motivation
Currently, for partitioned topics, the active consumer is selected using the formula [partitionedIndex % consumerSize](https://github.com/apache/pulsar/blob/137df29f85798b00de75460a1acb91c7bc25453f/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L129-L130).
This method can lead to uneven distribution of active consumers.
Consider a scenario with 100 topics named `public/default/topic-{0~100}`, each having `one partition`.
If 10 consumers are created using a `regex` subscription with the `Failover type`, all topic will be assigned to the same consumer(the first connected consumer). This results in an imbalanced distribution of consumers.
# Goals
## In Scope
- Address the issue of imbalance for `failover` subscription type consumers in single-partition or few-partition topics.
## Out of Scope
- Excluding the `exclusive` subscription type.
It's important to note that both the `modulo algorithm` and the `consistent hashing algorithm` can cause the consumer to be transferred.
This might result in messages being delivered multiple times to consumers, which is a known issue and has been mentioned in the documentation.
https://pulsar.apache.org/docs/4.0.x/concepts-messaging/#failover
# High Level Design
The solution involves adding a configuration setting that allows users to enable consistent hashing for partitioned topics.
When enabled, the consumer selection process will use consistent hashing instead of the modulo operation.
The algorithm already exists through [#19502](https://github.com/apache/pulsar/pull/19502)
In simple terms, the hash algorithm includes the following steps:
1. Hash Ring Creation: Traverse all consumers and use `consumer name` to calculate a hash ring with 100 virtual nodes.
[Exist code](https://github.com/apache/pulsar/blob/1b1bd4b610dd768a6908964ef841a6790bb0f4f0/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L152-L162)
```java
private NavigableMap<Integer, Integer> makeHashRing(int consumerSize) {
NavigableMap<Integer, Integer> hashRing = new TreeMap<>();
for (int i = 0; i < consumerSize; i++) {
for (int j = 0; j < CONSUMER_CONSISTENT_HASH_REPLICAS; j++) {
String key = consumers.get(i).consumerName() + j;
int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes());
hashRing.put(hash, i);
}
}
return Collections.unmodifiableNavigableMap(hashRing);
}
```
2. Consumer Selection: Use the hash of the topic name to select the matching consumer from the hash ring.
[Exist code](https://github.com/apache/pulsar/blob/1b1bd4b610dd768a6908964ef841a6790bb0f4f0/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L146-L150)
```java
private int peekConsumerIndexFromHashRing(NavigableMap<Integer, Integer> hashRing) {
int hash = Murmur3Hash32.getInstance().makeHash(topicName);
Map.Entry<Integer, Integer> ceilingEntry = hashRing.ceilingEntry(hash);
return ceilingEntry != null ? ceilingEntry.getValue() : hashRing.firstEntry().getValue();
}
```
This approach ensures a more even distribution of active consumers across topics, improving load balancing and resource utilization.
# Detailed Design
## Design & Implementation Details
Refer to implementation PR: https://github.com/apache/pulsar/pull/23584
The implementation is simple. If this activeConsumerFailoverConsistentHashing is enabled, the consistent hashing algorithm is used regardless of whether the topic is partitioned.
## Public-facing Changes
If activeConsumerFailoverConsistentHashing is enabled, when users use the failover subscription model,
the `first consumer` will not necessarily consume `P1`, and the `second consumer` will not necessarily consume `P2`.
As described in the documentation:: https://pulsar.apache.org/docs/4.0.x/concepts-messaging/#failover--partitioned-topics
Instead, the hash algorithm will determine which consumer consumes which partition.
### Configuration
A new configuration field will be added:
```java
@FieldContext(
category = CATEGORY_POLICIES,
doc = "Enable consistent hashing for selecting the active consumer in partitioned "
+ "topics with Failover subscription type. "
+ "For non-partitioned topics, consistent hashing is used by default."
)
private boolean activeConsumerFailoverConsistentHashing = false;
```
# Backward & Forward Compatibility
The default value is false to keep original behavior.