PIP-412: Support setting messagePayloadProcessor in Pulsar Functions and Sinks

Background knowledge

Pulsar Consumers support setting the messagePayloadProcessor to process the message payload, which is responsible for converting the raw buffer into structured messages, triggering appropriate callbacks for message consumption, and handling exceptions if they occur. The most critical aspect of this interface is the decoding of the raw buffer, which is then used to construct a Message object for consumption using methods like MessagePayloadContext.getMessageAt() or MessagePayloadContext.asSingleMessage().

Pulsar Functions and Sinks will create a Pulsar Consumer to consume messages from input topic(s) but lacks of the ability to set the messagePayloadProcessor, which is not flexible.

Motivation

The current implementation of Pulsar Functions and Sinks cannot set the messagePayloadProcessor, and require the message decoding logic to be tightly integrated with the function or sink itself. This coupling makes it challenging to extend or modify message processing logic without impacting the entire function or sink implementation.

Introducing support for setting a custom messagePayloadProcessor in Pulsar Functions and Sinks will provide the following benefits:

  • Modularity: Decouples message decoding from function or sink logic.
  • Reusability: Allows the same message decoding logic to be used across multiple functions or sinks.
  • Flexibility: Enables developers to easily swap out different decoding strategies without rewriting core processing logic.

Goals

In Scope

  • Allow users to specify custom message payload processor.

Out of Scope

High Level Design

Make users able to set messagePayloadProcessor for Pulsar Functions&Sinks.

Detailed Design

Design & Implementation Details

  • Add a new message MessagePayloadProcessorSpec with below fields in Function.proto, and add it as a new filed messagePayloadProcessorSpec to the ConsumerSpec message
    • string className
    • string configs
  • Add a new class MessagePayloadProcessorConfig with below fields and add it as a new field messagePayloadProcessorConfig to the ConsumerConfig:
    • String className
    • Map<String, Object> configs

And related logic also will be added:

convert the messagePayloadProcessorSpec field of the ConsumerSpec from FunctionDetails to the messagePayloadProcessorConfig field of the ConsumerConfig and vice versa

After the changes, users can set a custom message payload processor when creating the functions and sinks, like below using CLI arguments:

./bin/pulsar-admin functions create \
    --tenant public \
    --namespace default \
    --name test-java \
    --className org.apache.pulsar.functions.api.examples.ExclamationFunction \
    --inputs persistent://public/default/test-java-input \
    --input-specs '{"persistent://public/default/test-java-input": {"messagePayloadProcessorConfig": {"className": "com.example.CustomPayloadProcessor"}}}' \
    --jar /pulsar/examples/api-examples.jar
./bin/pulsar-admin sinks create \
    --tenant public \
    --namespace default \
    --sink-type elastic_search \
    --name elasticsearch-test-sink \
    --sink-config '{"elasticSearchUrl":"http://localhost:9200","indexName": "my_index"}' \
    --input-specs '{"persistent://public/default/test-es-input": {"messagePayloadProcessorConfig": {"className": "com.example.CustomPayloadProcessor", "configs": "{\"key1\":\"value1\"}"}}}'

Users can also use the function config file to set the message payload processor for functions:

tenant: "public"
namespace: "default"
name: "test-java"
jar: "/pulsar/examples/api-examples.jar"
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
inputs: ["persistent://public/default/test-java-input"]
output: "persistent://public/default/test-java-output"
autoAck: true
parallelism: 1
inputSpecs:
  persistent://public/default/test-java-input:
    messagePayloadProcessorConfig:
      className: "com.example.CustomPayloadProcessor"

And use source config file to set the batching configs for sources:

tenant: "public"
namespace: "default"
name: "data-generator-source"
topicName: "persistent://public/default/data-source-topic"
archive: "builtin://data-generator"
parallelism: 1
configs:
  sleepBetweenMessages: "5000"
inputSpecs:
  persistent://public/default/data-source-topic:
    messagePayloadProcessorConfig:
      className: "com.example.CustomPayloadProcessor"
      configs: "{\"key1\": \"value1\"}"

Public-facing Changes

No public changes, the configs are passed via the existing --input-specs argument.

Configuration

Metrics

Monitoring

Security Considerations

Backward & Forward Compatibility

When the messagePayloadProcessor is not passed via the --input-specs, it won't set the messagePayloadProcessor for the function or sink, which is backward compatible.

Upgrade

Downgrade / Rollback

Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations

No impact on geo-replication.

Alternatives

None

General Notes

This feature enhances the modularity of Pulsar Functions and Sinks and enables flexible message decoding strategies while maintaining backward compatibility.

Links