Background knowledge

As we can see from the doc that numWorkerThreadsForNonPersistentTopic is a configuration to specify the number of worker threads to serve non-persistent topic. Actually, numWorkerThreadsForNonPersistentTopic will specify the thread number of BrokerService#topicOrderedExecutor. Initially it was meant only for non-persistent topics, but now it is used for anything that needs to be done under strict order for a topic, like processing Subscriptions even for a persistent topic:

  • There is only one place invoke topicOrderedExecutor for non-persistent topics.[1]
  • Other places will invoke topicOrderedExecutor for persistent-topic or persistent-dispatcher. [2] [3] [4] [5]

Motivation

Making this config has a better name and increase the ability of users to understand what they are configuring.

High Level Design

Introduce topicOrderedExecutorThreadNum to deprecate numWorkerThreadsForNonPersistentTopic.

Detailed Design

Design & Implementation Details

Configuration

  • Introduce topicOrderedExecutorThreadNum with default value Runtime.getRuntime().availableProcessors():
private int topicOrderedExecutorThreadNum = Runtime.getRuntime().availableProcessors();
  • deprecate numWorkerThreadsForNonPersistentTopic and change it's default value from Runtime.getRuntime().availableProcessors() to -1:
private int numWorkerThreadsForNonPersistentTopic = -1;
  • Overwrite method ServiceConfiguration#getTopicOrderedExecutorThreadNum() from lombok.
public int getTopicOrderedExecutorThreadNum() {
        return numWorkerThreadsForNonPersistentTopic > 0
                ? numWorkerThreadsForNonPersistentTopic : topicOrderedExecutorThreadNum;
    }
  • And all places calling ServiceConfiguration#getNumWorkerThreadsForNonPersistentTopic() will call ServiceConfiguration#getTopicOrderedExecutorThreadNum() instead.

Backward & Forward Compatibility

Because we have overwritten method getTopicOrderedExecutorThreadNum() from lombok, so:

  • if user doesn't set the numWorkerThreadsForNonPersistentTopic, the value of worker threads will keep Runtime.getRuntime().availableProcessors()
  • If user has set the numWorkerThreadsForNonPersistentTopic, the value will keep what user set before.

Links