Pulsar Functions and Sources enable the batching feature hard-coded, and also set the batchingMaxPublishDelay
to 10ms, it only supports set the batch-builder
for now, this is not suitable for all the use cases, and also not feasible for users.
Support setting batching configurations for Pulsar Functions&Sources, to make it more flexible and suitable for users.
Make users able to enable&disable batching and set batching configurations for Pulsar Functions&Sources.
BatchingSpec
with below fields in Function.proto
, and add it as a new filed batchingSpec
to the ProducerSpec
messagebool enabled
int32 batchingMaxPublishDelayMs
int32 roundRobinRouterBatchingPartitionSwitchFrequency
int32 batchingMaxMessages
int32 batchingMaxBytes
string batchBuilder
BatchingConfig
with below fields and add it as a new field batchingConfig
to the ProducerConfig
:bool enabled
int batchingMaxPublishDelayMs
int roundRobinRouterBatchingPartitionSwitchFrequency
int batchingMaxMessages
int batchingMaxBytes
String batchBuilder
And related logic also will be added:
batchingSpec
field of the ProducerSpec
from FunctionDetails
to the batchingConfig
field of the ProducerConfig
and vice versaTo keep the compatibility, when the batchingSpec
of the ProducerSpec
is null when creating the ProducerConfig
from the ProducerSpec
, the batchingConfig
field will be fallback to: BatchingConfig(enabled=true, batchingMaxPublishDelayMs=10)
.
After the changes, users can pass the batching configurations when creating the functions and sources, like below using CLI arguments:
./bin/pulsar-admin functions create \ --tenant public \ --namespace default \ --name test-java \ --className org.apache.pulsar.functions.api.examples.ExclamationFunction \ --inputs persistent://public/default/test-java-input \ --producer-config '{"batchingConfig": {"enabled": true, "batchingMaxPublishDelayMs": 100, "roundRobinRouterBatchingPartitionSwitchFrequency": 10, "batchingMaxMessages": 1000}}' \ --jar /pulsar/examples/api-examples.jar
./bin/pulsar-admin sources create \ --name data-generator-source \ --source-type data-generator \ --destination-topic-name persistent://public/default/data-source-topic \ --producer-config '{"batchingConfig": {"enabled": true, "batchingMaxPublishDelayMs": 100, "roundRobinRouterBatchingPartitionSwitchFrequency": 10, "batchingMaxMessages": 1000}}' \ --source-config '{"sleepBetweenMessages": "1000"}'
Users can also use the function config file to set the batching configs for functions:
tenant: "public" namespace: "default" name: "test-java" jar: "/pulsar/examples/api-examples.jar" className: "org.apache.pulsar.functions.api.examples.ExclamationFunction" inputs: ["persistent://public/default/test-java-input"] output: "persistent://public/default/test-java-output" autoAck: true parallelism: 1 producerConfig: batchingConfig: enabled: true batchingMaxPublishDelayMs: 100 roundRobinRouterBatchingPartitionSwitchFrequency: 10 batchingMaxMessages: 1000
And use source config file to set the batching configs for sources:
tenant: "public" namespace: "default" name: "data-generator-source" topicName: "persistent://public/default/data-source-topic" archive: "builtin://data-generator" parallelism: 1 configs: sleepBetweenMessages: "5000" producerConfig: batchingConfig: enabled: true batchingMaxPublishDelayMs: 100 roundRobinRouterBatchingPartitionSwitchFrequency: 10 batchingMaxMessages: 1000
No changes are needed to revert to the previous version.
No other changes are needed to upgrade to the new version.
None