blob: 74ac7298bacd79841b5049517d45bae715d6587e [file] [log] [blame] [view]
# PIP-412: Support setting messagePayloadProcessor in Pulsar Functions and Sinks
## Background knowledge
Pulsar Consumers support setting the `messagePayloadProcessor` to process the message payload, which is responsible for converting the raw buffer into structured messages, triggering appropriate callbacks for message consumption, and handling exceptions if they occur. The most critical aspect of this interface is the decoding of the raw buffer, which is then used to construct a `Message` object for consumption using methods like `MessagePayloadContext.getMessageAt()` or `MessagePayloadContext.asSingleMessage()`.
Pulsar Functions and Sinks will create a Pulsar Consumer to consume messages from input topic(s) but lacks of the ability to set the `messagePayloadProcessor`, which is not flexible.
## Motivation
The current implementation of Pulsar Functions and Sinks cannot set the `messagePayloadProcessor`, and require the message decoding logic to be tightly integrated with the function or sink itself. This coupling makes it challenging to extend or modify message processing logic without impacting the entire function or sink implementation.
Introducing support for setting a custom `messagePayloadProcessor` in Pulsar Functions and Sinks will provide the following benefits:
- **Modularity:** Decouples message decoding from function or sink logic.
- **Reusability:** Allows the same message decoding logic to be used across multiple functions or sinks.
- **Flexibility:** Enables developers to easily swap out different decoding strategies without rewriting core processing logic.
## Goals
### In Scope
- Allow users to specify custom message payload processor.
### Out of Scope
## High Level Design
Make users able to set `messagePayloadProcessor` for Pulsar Functions&Sinks.
## Detailed Design
### Design & Implementation Details
- Add a new message `MessagePayloadProcessorSpec` with below fields in `Function.proto`, and add it as a new filed `messagePayloadProcessorSpec` to the `ConsumerSpec` message
- `string className`
- `string configs`
- Add a new class `MessagePayloadProcessorConfig` with below fields and add it as a new field `messagePayloadProcessorConfig` to the `ConsumerConfig`:
- `String className`
- `Map<String, Object> configs`
And related logic also will be added:
convert the `messagePayloadProcessorSpec` field of the `ConsumerSpec` from `FunctionDetails` to the `messagePayloadProcessorConfig` field of the `ConsumerConfig` and vice versa
After the changes, users can set a custom message payload processor when creating the functions and sinks, like below using CLI arguments:
```shell
./bin/pulsar-admin functions create \
--tenant public \
--namespace default \
--name test-java \
--className org.apache.pulsar.functions.api.examples.ExclamationFunction \
--inputs persistent://public/default/test-java-input \
--input-specs '{"persistent://public/default/test-java-input": {"messagePayloadProcessorConfig": {"className": "com.example.CustomPayloadProcessor"}}}' \
--jar /pulsar/examples/api-examples.jar
```
```shell
./bin/pulsar-admin sinks create \
--tenant public \
--namespace default \
--sink-type elastic_search \
--name elasticsearch-test-sink \
--sink-config '{"elasticSearchUrl":"http://localhost:9200","indexName": "my_index"}' \
--input-specs '{"persistent://public/default/test-es-input": {"messagePayloadProcessorConfig": {"className": "com.example.CustomPayloadProcessor", "configs": "{\"key1\":\"value1\"}"}}}'
```
Users can also use the function config file to set the message payload processor for functions:
```yaml
tenant: "public"
namespace: "default"
name: "test-java"
jar: "/pulsar/examples/api-examples.jar"
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
inputs: ["persistent://public/default/test-java-input"]
output: "persistent://public/default/test-java-output"
autoAck: true
parallelism: 1
inputSpecs:
persistent://public/default/test-java-input:
messagePayloadProcessorConfig:
className: "com.example.CustomPayloadProcessor"
```
And use source config file to set the batching configs for sources:
```yaml
tenant: "public"
namespace: "default"
name: "data-generator-source"
topicName: "persistent://public/default/data-source-topic"
archive: "builtin://data-generator"
parallelism: 1
configs:
sleepBetweenMessages: "5000"
inputSpecs:
persistent://public/default/data-source-topic:
messagePayloadProcessorConfig:
className: "com.example.CustomPayloadProcessor"
configs: "{\"key1\": \"value1\"}"
```
### Public-facing Changes
No public changes, the configs are passed via the existing `--input-specs` argument.
#### Configuration
#### Metrics
## Monitoring
## Security Considerations
## Backward & Forward Compatibility
When the `messagePayloadProcessor` is not passed via the `--input-specs`, it won't set the `messagePayloadProcessor` for the function or sink, which is backward compatible.
### Upgrade
### Downgrade / Rollback
### Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations
No impact on geo-replication.
## Alternatives
None
## General Notes
This feature enhances the modularity of Pulsar Functions and Sinks and enables flexible message decoding strategies while maintaining backward compatibility.
## Links
- Mailing List discussion thread: https://lists.apache.org/thread/cdc3tpmvl2h4xqwvsnb8559w9zdyk5cv
- Mailing List voting thread: https://lists.apache.org/thread/dch023bf0jn76gynzbbfl2cqn4mzwtnf