| # PIP-359: Support custom message listener executor for specific subscription |
| Implementation PR: [#22861](https://github.com/apache/pulsar/pull/22861) |
| |
| # Background knowledge |
| In the current Pulsar client versions, from the user's perspective, when using a Pulsar Consumer, |
| we have two main options to consume messages: |
| 1. Pull mode, by calling `consumer.receive()`(or `consumer.receiveAsync()`) |
| ```java |
| public class ConsumerExample { |
| public static void main(String[] args) throws PulsarClientException { |
| PulsarClient pulsarClient = PulsarClient.builder() |
| .serviceUrl("pulsar://localhost:6650") |
| .build(); |
| Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64) |
| .topic("persistent://public/default/my-topic") |
| .subscriptionName("my-subscription") |
| .subscribe(); |
| do { |
| Message<Long> message = consumer.receive(); |
| consumer.acknowledge(message); |
| } while (true); |
| |
| } |
| } |
| |
| ``` |
| 2. Push mode, by registering a `MessageListener` interface, when building the Consumer. |
| When this method is used, we can't also use `consumer.receive()`(or `consumer.receiveAsync()`). |
| In the push mode, the MessageListener instance is called by the consumer, hence it is |
| doing that with a thread taken from its own internal `ExecutorService` (i.e. thread pool). |
| The problem comes when we build and use multiple Consumers from the same PulsarClient. It |
| so happens that those consumers will share the same thread pool to call the Message Listeners. |
| One can be slower from the other. |
| |
| ```java |
| public class ConsumerExample { |
| public static void main(String[] args) throws PulsarClientException { |
| PulsarClient pulsarClient = PulsarClient.builder() |
| .serviceUrl("pulsar://localhost:6650") |
| .build(); |
| Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64) |
| .topic("persistent://public/default/my-topic") |
| .subscriptionName("my-subscription2") |
| .messageListener((consumer, message) -> { |
| // process message |
| consumer.acknowledgeAsync(message); |
| }) |
| .subscribe(); |
| } |
| } |
| ``` |
| |
| # Motivation |
| |
| As [Background knowledge](#background-knowledge) mentioned, when using asynchronous consumer, |
| by registering a MessageListener interface, there is a problem of different consumer groups |
| affecting each other, leading to unnecessary consumption delays. |
| Therefore, for this scenario, this PIP prepare to support specific a message |
| listener executor of consumer latitudes to solve such problem. |
| # Goals |
| 1. Improve consumer message listener isolation, solve the consumption delay problem caused by |
| mutual influence of different consumers from the same PulsarClient instance. |
| |
| ## In Scope |
| If this PIP is accepted, it will help Pulsar solve the problem of different consumers |
| from same `PulsarClient` affecting each other in the asynchronous consumption mode(`MessageListener`). |
| |
| ## Out of Scope |
| This PIP will not build the plugin library mentioned in [PR](https://github.com/apache/pulsar/pull/22902#issuecomment-2169962642), |
| we will open a new PIP in the future to do this |
| |
| |
| # Detailed Design |
| |
| ## Design & Implementation Details |
| |
| 1. Add an interface `MessageListenerExecutor`, responsible for executing message listener callback tasks. |
| Users can customize the implementation to determine in which thread the message listener task is executed. |
| For example, in the situation described in [Motivation](#motivation) part, users can implement the |
| interface with an independent underlying thread pool to ensure that the message listener task of each |
| consumer is executed in a separate thread. The caller would be responsible for the life cycle of the |
| Executor, and it would be used only for this specific consumer. |
| ```java |
| public interface MessageListenerExecutor { |
| |
| /** |
| * select a thread by message(if necessary, for example, |
| * Key_Shared SubscriptionType, maybe need select thread |
| * by message order key to ensure order) to execute the runnable! |
| * |
| * @param message the message |
| * @param runnable the runnable to execute |
| */ |
| void execute(Message<?> message, Runnable runnable); |
| } |
| ``` |
| 2. Add an optional config `messageListenerExecutor` in `ConsumerBuilder`, then |
| users can pass their implementations. |
| ```java |
| ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor); |
| ``` |
| |
| ### Why need an interface like `MessageListenerExecutor` |
| Some people may wonder why not just use `java.util.concurrent.ExecutorService`, |
| but define an interface like `MessageListenerExecutor`. |
| |
| The reason is that: |
| |
| For sequential consumption scenarios, we need to ensure that messages with the same |
| key or the same partition are processed by the same thread to ensure order. If we |
| use `java.util.concurrent.ExecutorService`, refer to the following figure, we will not be able to make such guarantees, |
| because for ExecutorService, which thread to execute the task is not controlled by the user. |
|  |
|  |
| ### Interface implementation suggestions |
| When implementing the `MessageListenerExecutor` interface, you should consider the following points. |
| 1. if you need to ensure the order of message processing, |
| you can select the thread by the message order key or `msg.getTopicName()`(partition topic name), |
| to ensure that the messages of the same order key (or partition) are processed in same thread. |
| |
| ### Usage Example |
| ```java |
| private void startConsumerWithMessageListener(String topic, String subscriptionName) throws PulsarClientException { |
| // for example: key_shared |
| MessageListenerExecutor keySharedExecutor = getKeySharedMessageListenerExecutor(subscriptionName); |
| Consumer<Long> keySharedconsumer = |
| pulsarClient.newConsumer(Schema.INT64) |
| .topic(topic) |
| .subscriptionName(subscriptionName) |
| // set and then message lister will be executed in the executor |
| .messageListener((c1, msg) -> { |
| log.info("Received message [{}] in the listener", msg.getValue()); |
| c1.acknowledgeAsync(msg); |
| }) |
| .messageListenerExecutor(keySharedExecutor) |
| .subscribe(); |
| |
| |
| // for example: partition_ordered |
| MessageListenerExecutor partitionOrderedExecutor = getPartitionOrderdMessageListenerExecutor(subscriptionName); |
| Consumer<Long> partitionOrderedConsumer = |
| pulsarClient.newConsumer(Schema.INT64) |
| .topic(topic) |
| .subscriptionName(subscriptionName) |
| // set and then message lister will be executed in the executor |
| .messageListener((c1, msg) -> { |
| log.info("Received message [{}] in the listener", msg.getValue()); |
| c1.acknowledgeAsync(msg); |
| }) |
| .messageListenerExecutor(partitionOrderedExecutor) |
| .subscribe(); |
| |
| // for example: out-of-order |
| ExecutorService executorService = Executors.newFixedThreadPool(10); |
| Consumer<Long> outOfOrderConsumer = |
| pulsarClient.newConsumer(Schema.INT64) |
| .topic(topic) |
| .subscriptionName(subscriptionName) |
| // not set and then message lister will be executed in the default executor |
| .messageListener((c1, msg) -> { |
| log.info("Received message [{}] in the listener", msg.getValue()); |
| c1.acknowledgeAsync(msg); |
| }) |
| .messageListenerExecutor((message, runnable) -> executorService.execute(runnable)) |
| .subscribe(); |
| } |
| |
| private static MessageListenerExecutor getKeySharedMessageListenerExecutor(String subscriptionName) { |
| ExecutorProvider executorProvider = new ExecutorProvider(10, subscriptionName + "listener-executor-"); |
| |
| return (message, runnable) -> { |
| byte[] key = "".getBytes(StandardCharsets.UTF_8); |
| if (message.hasKey()) { |
| key = message.getKeyBytes(); |
| } else if (message.hasOrderingKey()) { |
| key = message.getOrderingKey(); |
| } |
| // select a thread by message key to execute the runnable! |
| // that say, the message listener task with same order key |
| // will be executed by the same thread |
| ExecutorService executorService = executorProvider.getExecutor(key); |
| // executorService is a SingleThreadExecutor |
| executorService.execute(runnable); |
| }; |
| } |
| |
| private static MessageListenerExecutor getPartitionOrderdMessageListenerExecutor(String subscriptionName) { |
| ExecutorProvider executorProvider = new ExecutorProvider(10, subscriptionName + "listener-executor-"); |
| |
| return (message, runnable) -> { |
| // select a thread by partition topic name to execute the runnable! |
| // that say, the message listener task from the same partition topic |
| // will be executed by the same thread |
| ExecutorService executorService = executorProvider.getExecutor(message.getTopicName().getBytes()); |
| // executorService is a SingleThreadExecutor |
| executorService.execute(runnable); |
| }; |
| } |
| |
| ``` |
| ## Public-facing Changes |
| |
| ### Public API |
| |
| 1. Add an optional config `messageListenerExecutor` in `ConsumerBuilder` |
| ```java |
| ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor); |
| ``` |
| |
| # Backward & Forward Compatibility |
| You can do upgrading or reverting normally, no specified steps are needed to do. |
| |
| # Links |
| |
| * Mailing List discussion thread: https://lists.apache.org/thread/8nhqfdhkglsg5bgx6z7c1nho7z7l596l |
| * Mailing List voting thread: https://lists.apache.org/thread/oo3jdvq3b6bv6p4n7x7sdvypw4gp6hpk |