PIP-392: Add configuration to enable consistent hashing to select active consumer for partitioned topic

Background knowledge

After #19502 will use consistent hashing to select active consumer for non-partitioned topic

Motivation

Currently, for partitioned topics, the active consumer is selected using the formula partitionedIndex % consumerSize. This method can lead to uneven distribution of active consumers.

Consider a scenario with 100 topics named public/default/topic-{0~100}, each having one partition. If 10 consumers are created using a regex subscription with the Failover type, all topic will be assigned to the same consumer(the first connected consumer). This results in an imbalanced distribution of consumers.

Goals

In Scope

  • Address the issue of imbalance for failover subscription type consumers in single-partition or few-partition topics.

Out of Scope

  • Excluding the exclusive subscription type.

It's important to note that both the modulo algorithm and the consistent hashing algorithm can cause the consumer to be transferred. This might result in messages being delivered multiple times to consumers, which is a known issue and has been mentioned in the documentation. https://pulsar.apache.org/docs/4.0.x/concepts-messaging/#failover

High Level Design

The solution involves adding a configuration setting that allows users to enable consistent hashing for partitioned topics. When enabled, the consumer selection process will use consistent hashing instead of the modulo operation.

The algorithm already exists through #19502

In simple terms, the hash algorithm includes the following steps:

  1. Hash Ring Creation: Traverse all consumers and use consumer name to calculate a hash ring with 100 virtual nodes.

Exist code

    private NavigableMap<Integer, Integer> makeHashRing(int consumerSize) {
        NavigableMap<Integer, Integer> hashRing = new TreeMap<>();
        for (int i = 0; i < consumerSize; i++) {
            for (int j = 0; j < CONSUMER_CONSISTENT_HASH_REPLICAS; j++) {
                String key = consumers.get(i).consumerName() + j;
                int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes());
                hashRing.put(hash, i);
            }
        }
        return Collections.unmodifiableNavigableMap(hashRing);
    }
  1. Consumer Selection: Use the hash of the topic name to select the matching consumer from the hash ring.

Exist code

    private int peekConsumerIndexFromHashRing(NavigableMap<Integer, Integer> hashRing) {
        int hash = Murmur3Hash32.getInstance().makeHash(topicName);
        Map.Entry<Integer, Integer> ceilingEntry = hashRing.ceilingEntry(hash);
        return ceilingEntry != null ? ceilingEntry.getValue() : hashRing.firstEntry().getValue();
    }

This approach ensures a more even distribution of active consumers across topics, improving load balancing and resource utilization.

Detailed Design

Design & Implementation Details

Refer to implementation PR: https://github.com/apache/pulsar/pull/23584

The implementation is simple. If this activeConsumerFailoverConsistentHashing is enabled, the consistent hashing algorithm is used regardless of whether the topic is partitioned.

Public-facing Changes

If activeConsumerFailoverConsistentHashing is enabled, when users use the failover subscription model, the first consumer will not necessarily consume P1, and the second consumer will not necessarily consume P2.

As described in the documentation:: https://pulsar.apache.org/docs/4.0.x/concepts-messaging/#failover--partitioned-topics

Instead, the hash algorithm will determine which consumer consumes which partition.

Configuration

A new configuration field will be added:

@FieldContext(
    category = CATEGORY_POLICIES,
    doc = "Enable consistent hashing for selecting the active consumer in partitioned "
        + "topics with Failover subscription type. "
        + "For non-partitioned topics, consistent hashing is used by default."
)
private boolean activeConsumerFailoverConsistentHashing = false;

Backward & Forward Compatibility

The default value is false to keep original behavior.