PIP-401: Support set batching configurations for Pulsar Functions&Sources

Background knowledge

Pulsar Functions and Sources enable the batching feature hard-coded, and also set the batchingMaxPublishDelay to 10ms, it only supports set the batch-builder for now, this is not suitable for all the use cases, and also not feasible for users.

Motivation

Support setting batching configurations for Pulsar Functions&Sources, to make it more flexible and suitable for users.

Goals

In Scope

  • Support setting batching configurations for Pulsar Functions&Sources.

High Level Design

Make users able to enable&disable batching and set batching configurations for Pulsar Functions&Sources.

Detailed Design

Design & Implementation Details

  • Add a new message BatchingSpec with below fields in Function.proto, and add it as a new filed batchingSpec to the ProducerSpec message
    • bool enabled
    • int32 batchingMaxPublishDelayMs
    • int32 roundRobinRouterBatchingPartitionSwitchFrequency
    • int32 batchingMaxMessages
    • int32 batchingMaxBytes
    • string batchBuilder
  • Add a new class BatchingConfig with below fields and add it as a new field batchingConfig to the ProducerConfig:
    • bool enabled
    • int batchingMaxPublishDelayMs
    • int roundRobinRouterBatchingPartitionSwitchFrequency
    • int batchingMaxMessages
    • int batchingMaxBytes
    • String batchBuilder

And related logic also will be added:

  • convert the batchingSpec field of the ProducerSpec from FunctionDetails to the batchingConfig field of the ProducerConfig and vice versa

To keep the compatibility, when the batchingSpec of the ProducerSpec is null when creating the ProducerConfig from the ProducerSpec, the batchingConfig field will be fallback to: BatchingConfig(enabled=true, batchingMaxPublishDelayMs=10).

After the changes, users can pass the batching configurations when creating the functions and sources, like below using CLI arguments:

./bin/pulsar-admin functions create \
    --tenant public \
    --namespace default \
    --name test-java \
    --className org.apache.pulsar.functions.api.examples.ExclamationFunction \
    --inputs persistent://public/default/test-java-input \
    --producer-config '{"batchingConfig": {"enabled": true, "batchingMaxPublishDelayMs": 100, "roundRobinRouterBatchingPartitionSwitchFrequency": 10, "batchingMaxMessages": 1000}}' \
    --jar /pulsar/examples/api-examples.jar
./bin/pulsar-admin sources create \
    --name data-generator-source \
    --source-type data-generator \
    --destination-topic-name persistent://public/default/data-source-topic \
    --producer-config '{"batchingConfig": {"enabled": true, "batchingMaxPublishDelayMs": 100, "roundRobinRouterBatchingPartitionSwitchFrequency": 10, "batchingMaxMessages": 1000}}' \
    --source-config '{"sleepBetweenMessages": "1000"}'

Users can also use the function config file to set the batching configs for functions:

tenant: "public"
namespace: "default"
name: "test-java"
jar: "/pulsar/examples/api-examples.jar"
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
inputs: ["persistent://public/default/test-java-input"]
output: "persistent://public/default/test-java-output"
autoAck: true
parallelism: 1
producerConfig:
  batchingConfig:
    enabled: true
    batchingMaxPublishDelayMs: 100
    roundRobinRouterBatchingPartitionSwitchFrequency: 10
    batchingMaxMessages: 1000

And use source config file to set the batching configs for sources:

tenant: "public"
namespace: "default"
name: "data-generator-source"
topicName: "persistent://public/default/data-source-topic"
archive: "builtin://data-generator"
parallelism: 1
configs:
  sleepBetweenMessages: "5000"
producerConfig:
  batchingConfig:
    enabled: true
    batchingMaxPublishDelayMs: 100
    roundRobinRouterBatchingPartitionSwitchFrequency: 10
    batchingMaxMessages: 1000

Public-facing Changes

CLI

Monitoring

Security Considerations

Backward & Forward Compatibility

Revert

No changes are needed to revert to the previous version.

Upgrade

No other changes are needed to upgrade to the new version.

Alternatives

None

General Notes

Links