Pulsar Consumers support setting the messagePayloadProcessor
to process the message payload, which is responsible for converting the raw buffer into structured messages, triggering appropriate callbacks for message consumption, and handling exceptions if they occur. The most critical aspect of this interface is the decoding of the raw buffer, which is then used to construct a Message
object for consumption using methods like MessagePayloadContext.getMessageAt()
or MessagePayloadContext.asSingleMessage()
.
Pulsar Functions and Sinks will create a Pulsar Consumer to consume messages from input topic(s) but lacks of the ability to set the messagePayloadProcessor
, which is not flexible.
The current implementation of Pulsar Functions and Sinks cannot set the messagePayloadProcessor
, and require the message decoding logic to be tightly integrated with the function or sink itself. This coupling makes it challenging to extend or modify message processing logic without impacting the entire function or sink implementation.
Introducing support for setting a custom messagePayloadProcessor
in Pulsar Functions and Sinks will provide the following benefits:
Make users able to set messagePayloadProcessor
for Pulsar Functions&Sinks.
MessagePayloadProcessorSpec
with below fields in Function.proto
, and add it as a new filed messagePayloadProcessorSpec
to the ConsumerSpec
messagestring className
string configs
MessagePayloadProcessorConfig
with below fields and add it as a new field messagePayloadProcessorConfig
to the ConsumerConfig
:String className
Map<String, Object> configs
And related logic also will be added:
convert the messagePayloadProcessorSpec
field of the ConsumerSpec
from FunctionDetails
to the messagePayloadProcessorConfig
field of the ConsumerConfig
and vice versa
After the changes, users can set a custom message payload processor when creating the functions and sinks, like below using CLI arguments:
./bin/pulsar-admin functions create \ --tenant public \ --namespace default \ --name test-java \ --className org.apache.pulsar.functions.api.examples.ExclamationFunction \ --inputs persistent://public/default/test-java-input \ --input-specs '{"persistent://public/default/test-java-input": {"messagePayloadProcessorConfig": {"className": "com.example.CustomPayloadProcessor"}}}' \ --jar /pulsar/examples/api-examples.jar
./bin/pulsar-admin sinks create \ --tenant public \ --namespace default \ --sink-type elastic_search \ --name elasticsearch-test-sink \ --sink-config '{"elasticSearchUrl":"http://localhost:9200","indexName": "my_index"}' \ --input-specs '{"persistent://public/default/test-es-input": {"messagePayloadProcessorConfig": {"className": "com.example.CustomPayloadProcessor", "configs": "{\"key1\":\"value1\"}"}}}'
Users can also use the function config file to set the message payload processor for functions:
tenant: "public" namespace: "default" name: "test-java" jar: "/pulsar/examples/api-examples.jar" className: "org.apache.pulsar.functions.api.examples.ExclamationFunction" inputs: ["persistent://public/default/test-java-input"] output: "persistent://public/default/test-java-output" autoAck: true parallelism: 1 inputSpecs: persistent://public/default/test-java-input: messagePayloadProcessorConfig: className: "com.example.CustomPayloadProcessor"
And use source config file to set the batching configs for sources:
tenant: "public" namespace: "default" name: "data-generator-source" topicName: "persistent://public/default/data-source-topic" archive: "builtin://data-generator" parallelism: 1 configs: sleepBetweenMessages: "5000" inputSpecs: persistent://public/default/data-source-topic: messagePayloadProcessorConfig: className: "com.example.CustomPayloadProcessor" configs: "{\"key1\": \"value1\"}"
No public changes, the configs are passed via the existing --input-specs
argument.
When the messagePayloadProcessor
is not passed via the --input-specs
, it won't set the messagePayloadProcessor
for the function or sink, which is backward compatible.
No impact on geo-replication.
None
This feature enhances the modularity of Pulsar Functions and Sinks and enables flexible message decoding strategies while maintaining backward compatibility.