Currently, when users want to modify the data in Pulsar, they need to write a Function. For a lot of use cases, it would be handy for them to be able to use a ready-made built-in Function that implements the most common basic transformations like the ones available in Kafka Connect’s SMTs. This removes users the burden of writing the Function themselves, having to understanding the perks of Pulsar Schemas, coding in a language that they may not master (probably Java if they want to do advanced stuff), and they benefit from battle-tested, maintained, performance-optimised code.
This PIP is about providing a TransformFunction
that executes a sequence of basic transformations on the data. The TransformFunction
shall be easy to configure, launchable as a built-in NAR. The TransformFunction
shall be able to apply a sequence of common transformations in-memory so we don’t need to execute the TransformFunction
multiple times and read/write to a topic each time.
This PIP is not about appending such a Function to a Source or a Sink. While this is the ultimate goal, so we can provide an experience similar to Kafka SMTs and avoid a read/write to a topic, this work will be done in a future PIP. It is expected that the code written for this PIP will be reusable in this future work.
This PIP will introduce a new transform
module in pulsar-function
multi-module project.
The produced artifact will be a NAR of the TransformFunction.
When it processes a record, TransformFunction
will :
Call in sequence the process
method of a series of TransformStep
implementations. Each TransformStep
will modify the output message and topic as needed.
Send the transformed message to the output topic.
The TransformFunction
will read its configuration as Json from userConfig
in the format:
{ "steps": [ { "type": "drop-fields", "fields": "keyField1,keyField2", "part": "key" }, { "type": "merge-key-value" }, { "type": "unwrap-key-value" }, { "type": "cast", "schema-type": "STRING" } ] }
Each step is defined by its type
and uses its own arguments.
This example config applied on a KeyValue<AVRO, AVRO> input record with value {key={keyField1: key1, keyField2: key2, keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}}
will give after each step:
{key={keyField1: key1, keyField2: key2, keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}}(KeyValue<AVRO, AVRO>) | | ”type": "drop-fields", "fields": "keyField1,keyField2”, "part": "key” | {key={keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>) | | "type": "merge-key-value" | {key={keyField3: key3}, value={keyField3: key3, valueField1: value1, valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>) | | "type": "unwrap-key-value" | {keyField3: key3, valueField1: value1, valueField2: value2, valueField3: value3} (AVRO) | | "type": "cast", "schema-type": "STRING" | {"keyField3": "key3", "valueField1": "value1", "valueField2": "value2", "valueField3": "value3"} (STRING)
TransformFunction
will be built as a NAR including a pulsar-io.yaml
service file so it can be registered as a built-in function with name transform
.
Create a separate third party project not managed by the Pulsar community. Problems: