After #19502 will use consistent hashing to select active consumer for non-partitioned topic
Currently, for partitioned topics, the active consumer is selected using the formula partitionedIndex % consumerSize. This method can lead to uneven distribution of active consumers.
Consider a scenario with 100 topics named public/default/topic-{0~100}
, each having one partition
. If 10 consumers are created using a regex
subscription with the Failover type
, all topic will be assigned to the same consumer(the first connected consumer). This results in an imbalanced distribution of consumers.
failover
subscription type consumers in single-partition or few-partition topics.exclusive
subscription type.It's important to note that both the modulo algorithm
and the consistent hashing algorithm
can cause the consumer to be transferred. This might result in messages being delivered multiple times to consumers, which is a known issue and has been mentioned in the documentation. https://pulsar.apache.org/docs/4.0.x/concepts-messaging/#failover
The solution involves adding a configuration setting that allows users to enable consistent hashing for partitioned topics. When enabled, the consumer selection process will use consistent hashing instead of the modulo operation.
The algorithm already exists through #19502
In simple terms, the hash algorithm includes the following steps:
consumer name
to calculate a hash ring with 100 virtual nodes.private NavigableMap<Integer, Integer> makeHashRing(int consumerSize) { NavigableMap<Integer, Integer> hashRing = new TreeMap<>(); for (int i = 0; i < consumerSize; i++) { for (int j = 0; j < CONSUMER_CONSISTENT_HASH_REPLICAS; j++) { String key = consumers.get(i).consumerName() + j; int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes()); hashRing.put(hash, i); } } return Collections.unmodifiableNavigableMap(hashRing); }
private int peekConsumerIndexFromHashRing(NavigableMap<Integer, Integer> hashRing) { int hash = Murmur3Hash32.getInstance().makeHash(topicName); Map.Entry<Integer, Integer> ceilingEntry = hashRing.ceilingEntry(hash); return ceilingEntry != null ? ceilingEntry.getValue() : hashRing.firstEntry().getValue(); }
This approach ensures a more even distribution of active consumers across topics, improving load balancing and resource utilization.
Refer to implementation PR: https://github.com/apache/pulsar/pull/23584
The implementation is simple. If this activeConsumerFailoverConsistentHashing is enabled, the consistent hashing algorithm is used regardless of whether the topic is partitioned.
If activeConsumerFailoverConsistentHashing is enabled, when users use the failover subscription model, the first consumer
will not necessarily consume P1
, and the second consumer
will not necessarily consume P2
.
As described in the documentation:: https://pulsar.apache.org/docs/4.0.x/concepts-messaging/#failover--partitioned-topics
Instead, the hash algorithm will determine which consumer consumes which partition.
A new configuration field will be added:
@FieldContext( category = CATEGORY_POLICIES, doc = "Enable consistent hashing for selecting the active consumer in partitioned " + "topics with Failover subscription type. " + "For non-partitioned topics, consistent hashing is used by default." ) private boolean activeConsumerFailoverConsistentHashing = false;
The default value is false to keep original behavior.