Pulsar Functions has been a convenient tool for simple real-time message processing scenario. It allows users to implement business logic with a minimum interface and then submit directly to the existing Pulsar cluster to execute. The simplicity
comes in two folds: 1. Simple Interface; 2. Simple Deployment. People don‘t need to learn a complicated bundle of new interfaces in order to express their simple jobs. And they also don’t need to set up and maintain a new stream processing infrastructure cluster in order to run the functions.
As the functions have been used for some time, we realized that the native support of allowing multiple functions to be organized together is demanding. With support, people can express and manage multi-stage jobs easily. In addition, this support also provides the possibility of higher-level abstraction DSL to further simplify the job composition. We call this new feature -- Pulsar Function Mesh.
In the following sections, we will discuss the design of the Pulsar Function Mesh.
The Pulsar Function Mesh reuses the existing Pulsar Function as the fundamental computing unit. Based on the existing function runtimes, we need to add/make changes in the following parts in order to provide the support for Function Mesh:
These changes require minimum development work while enable users to manage function meshes within the existing Pulsar cluster.
For the definition, the user provides a YAML file to express the composition of the Mesh. The major fields are listed in the following example file:
# Metadata name: PIP_Mesh namespace: PIP_Namespace tenant: PIP_Tenant # Function Mesh configs jarFile: /local/jar/files/example.jar # Functions functionInfos: - name: Func1 classname: org.apache.pulsar.functions.api.examples.ExclamationFunction replicas: 1 inputs: - pulsar_topic_sourcce output: - pulsar_topic_1 - name: Func2 classname: org.apache.pulsar.functions.api.examples.ExclamationFunction replicas: 1 inputs: - pulsar_topic_1 output: - pulsar_topic_result
The above YAML file describes the following Function Mesh, in which, the Func_1
reads original data from pulsar_topic_source
and processes it. After the processing, data is pushed to another pulsar topic pulsar_topic_1
. Func_2
then process the data and push the final result into pulsar_topic_sink
for others to access. The topology of this Function Mesh is demonstrated as follows:
Source_Topic ------> Func_1 ------> Topic_1 ------> Func_2 ------> Sink_Topic
One thing that needs attention is that each Pulsar Function Mesh is self-contained and should not reference a function unit instance in other‘s Function Mesh. This ensures the complexity of a job or the whole jobs across the entire namespace is limited to a certain extent. And if there’s a need to interact with other mesh, it's good to just create a new mesh that consumes pulsar topics generated by others.
The pulsar-admin client tool will be added with the new subcommand to allow user to submit Function Mesh to the Pulsar cluster. For minimum usage, these three create/delete/list
mesh commands are needed. And more commands can be added as needed.
bin/pulsar-admin function-mesh create -f mesh.yaml
bin/pulsar-admin function-mesh delete --tenant default --namespace public --name pip-mesh
bin/pulsar-admin function-mesh list --tenant default --namespace public
Corresponding to the added CLI command, the HTTP admin endpoint also need to handle these requests accordingly. And more endpoint API could be added as needed.
public void registerFunctionMesh(...) public void deregisterFunctionMesh(...) public List<String> listFunctionMesh(...)
We introduce a FunctionMeshManager
in the Function Worker to manage the metadata of a Function Mesh. And for actually running functions inside the mesh, we largely reuse the existing functions runtime to execute the actual function unit. The architecture is as follows:
The FunctionMeshManager
is the entry point for a submitted mesh and mainly responsible for the following tasks:
One optimization for Function Mesh is to group several functions together in one FunctionRunner
process in order to reduce the latency as well as the internal pulsar topics used. With this optimization, the FunctionRunner
still has one source and one sink, but a list of functions to execute together sequentially.