blob: 00a31224970989250101c2bac1868ede89e17289 [file] [log] [blame] [view]
# PIP-359: Support custom message listener executor for specific subscription
Implementation PR: [#22861](https://github.com/apache/pulsar/pull/22861)
# Background knowledge
In the current Pulsar client versions, from the user's perspective, when using a Pulsar Consumer,
we have two main options to consume messages:
1. Pull mode, by calling `consumer.receive()`(or `consumer.receiveAsync()`)
```java
public class ConsumerExample {
public static void main(String[] args) throws PulsarClientException {
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.subscribe();
do {
Message<Long> message = consumer.receive();
consumer.acknowledge(message);
} while (true);
}
}
```
2. Push mode, by registering a `MessageListener` interface, when building the Consumer.
When this method is used, we can't also use `consumer.receive()`(or `consumer.receiveAsync()`).
In the push mode, the MessageListener instance is called by the consumer, hence it is
doing that with a thread taken from its own internal `ExecutorService` (i.e. thread pool).
The problem comes when we build and use multiple Consumers from the same PulsarClient. It
so happens that those consumers will share the same thread pool to call the Message Listeners.
One can be slower from the other.
```java
public class ConsumerExample {
public static void main(String[] args) throws PulsarClientException {
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription2")
.messageListener((consumer, message) -> {
// process message
consumer.acknowledgeAsync(message);
})
.subscribe();
}
}
```
# Motivation
As [Background knowledge](#background-knowledge) mentioned, when using asynchronous consumer,
by registering a MessageListener interface, there is a problem of different consumer groups
affecting each other, leading to unnecessary consumption delays.
Therefore, for this scenario, this PIP prepare to support specific a message
listener executor of consumer latitudes to solve such problem.
# Goals
1. Improve consumer message listener isolation, solve the consumption delay problem caused by
mutual influence of different consumers from the same PulsarClient instance.
## In Scope
If this PIP is accepted, it will help Pulsar solve the problem of different consumers
from same `PulsarClient` affecting each other in the asynchronous consumption mode(`MessageListener`).
## Out of Scope
This PIP will not build the plugin library mentioned in [PR](https://github.com/apache/pulsar/pull/22902#issuecomment-2169962642),
we will open a new PIP in the future to do this
# Detailed Design
## Design & Implementation Details
1. Add an interface `MessageListenerExecutor`, responsible for executing message listener callback tasks.
Users can customize the implementation to determine in which thread the message listener task is executed.
For example, in the situation described in [Motivation](#motivation) part, users can implement the
interface with an independent underlying thread pool to ensure that the message listener task of each
consumer is executed in a separate thread. The caller would be responsible for the life cycle of the
Executor, and it would be used only for this specific consumer.
```java
public interface MessageListenerExecutor {
/**
* select a thread by message(if necessary, for example,
* Key_Shared SubscriptionType, maybe need select thread
* by message order key to ensure order) to execute the runnable!
*
* @param message the message
* @param runnable the runnable to execute
*/
void execute(Message<?> message, Runnable runnable);
}
```
2. Add an optional config `messageListenerExecutor` in `ConsumerBuilder`, then
users can pass their implementations.
```java
ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor);
```
### Why need an interface like `MessageListenerExecutor`
Some people may wonder why not just use `java.util.concurrent.ExecutorService`,
but define an interface like `MessageListenerExecutor`.
The reason is that:
For sequential consumption scenarios, we need to ensure that messages with the same
key or the same partition are processed by the same thread to ensure order. If we
use `java.util.concurrent.ExecutorService`, refer to the following figure, we will not be able to make such guarantees,
because for ExecutorService, which thread to execute the task is not controlled by the user.
![](https://github.com/AuroraTwinkle/pulsar/assets/25919180/232854d6-01f2-4821-b2df-34d01dda1992)
![](https://github.com/AuroraTwinkle/pulsar/assets/25919180/204f5622-1e5a-4e73-b86b-15220bfb06d6)
### Interface implementation suggestions
When implementing the `MessageListenerExecutor` interface, you should consider the following points.
1. if you need to ensure the order of message processing,
you can select the thread by the message order key or `msg.getTopicName()`(partition topic name),
to ensure that the messages of the same order key (or partition) are processed in same thread.
### Usage Example
```java
private void startConsumerWithMessageListener(String topic, String subscriptionName) throws PulsarClientException {
// for example: key_shared
MessageListenerExecutor keySharedExecutor = getKeySharedMessageListenerExecutor(subscriptionName);
Consumer<Long> keySharedconsumer =
pulsarClient.newConsumer(Schema.INT64)
.topic(topic)
.subscriptionName(subscriptionName)
// set and then message lister will be executed in the executor
.messageListener((c1, msg) -> {
log.info("Received message [{}] in the listener", msg.getValue());
c1.acknowledgeAsync(msg);
})
.messageListenerExecutor(keySharedExecutor)
.subscribe();
// for example: partition_ordered
MessageListenerExecutor partitionOrderedExecutor = getPartitionOrderdMessageListenerExecutor(subscriptionName);
Consumer<Long> partitionOrderedConsumer =
pulsarClient.newConsumer(Schema.INT64)
.topic(topic)
.subscriptionName(subscriptionName)
// set and then message lister will be executed in the executor
.messageListener((c1, msg) -> {
log.info("Received message [{}] in the listener", msg.getValue());
c1.acknowledgeAsync(msg);
})
.messageListenerExecutor(partitionOrderedExecutor)
.subscribe();
// for example: out-of-order
ExecutorService executorService = Executors.newFixedThreadPool(10);
Consumer<Long> outOfOrderConsumer =
pulsarClient.newConsumer(Schema.INT64)
.topic(topic)
.subscriptionName(subscriptionName)
// not set and then message lister will be executed in the default executor
.messageListener((c1, msg) -> {
log.info("Received message [{}] in the listener", msg.getValue());
c1.acknowledgeAsync(msg);
})
.messageListenerExecutor((message, runnable) -> executorService.execute(runnable))
.subscribe();
}
private static MessageListenerExecutor getKeySharedMessageListenerExecutor(String subscriptionName) {
ExecutorProvider executorProvider = new ExecutorProvider(10, subscriptionName + "listener-executor-");
return (message, runnable) -> {
byte[] key = "".getBytes(StandardCharsets.UTF_8);
if (message.hasKey()) {
key = message.getKeyBytes();
} else if (message.hasOrderingKey()) {
key = message.getOrderingKey();
}
// select a thread by message key to execute the runnable!
// that say, the message listener task with same order key
// will be executed by the same thread
ExecutorService executorService = executorProvider.getExecutor(key);
// executorService is a SingleThreadExecutor
executorService.execute(runnable);
};
}
private static MessageListenerExecutor getPartitionOrderdMessageListenerExecutor(String subscriptionName) {
ExecutorProvider executorProvider = new ExecutorProvider(10, subscriptionName + "listener-executor-");
return (message, runnable) -> {
// select a thread by partition topic name to execute the runnable!
// that say, the message listener task from the same partition topic
// will be executed by the same thread
ExecutorService executorService = executorProvider.getExecutor(message.getTopicName().getBytes());
// executorService is a SingleThreadExecutor
executorService.execute(runnable);
};
}
```
## Public-facing Changes
### Public API
1. Add an optional config `messageListenerExecutor` in `ConsumerBuilder`
```java
ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor);
```
# Backward & Forward Compatibility
You can do upgrading or reverting normally, no specified steps are needed to do.
# Links
* Mailing List discussion thread: https://lists.apache.org/thread/8nhqfdhkglsg5bgx6z7c1nho7z7l596l
* Mailing List voting thread: https://lists.apache.org/thread/oo3jdvq3b6bv6p4n7x7sdvypw4gp6hpk