Pulsar IO connectors make it possible to connect Pulsar to an external system:
Pulsar also has a lightweight computing system named Pulsar Functions. A Pulsar Function reads from one or more topics, applies user logic written in Java, Python or Go and writes to an output topic.
When using Pulsar IO connectors, the format of what is read/written from/to the source/sink is defined by the connector code. But there are a lot of situations where a user wants to transform this data before using it. Currently the solution is to either :
Considering all this, it would be handy to be able to apply a Function on-the-fly to a connector without going through an intermediary topic.
This PIP defines the changes needed to be able to apply a preprocessing Function on-the-fly to a Sink. The preprocessing function can be a built-in function, a package function, or loaded through an http URL or a file path. Sources, Sinks and Functions are based on the same runtime process that:
This PIP reuses this and allows configuring a Function different from IdentityFunction to Sinks. Only Functions returning a Record will be authorized to ensure that the Function sets the Schema explicitly.
Out of the scope of this PIP, for future work:
The following options will be added to the pulsar-admin sinks
CLI create
, update
and localrun
:
preprocess-function
: the preprocess function applied before the Sink. Starts by builtin://
for built-in functions, function://
for package function, http://
or file://
preprocess-function-classname
: the preprocess function class name (optional if the function is a NAR)preprocess-function-config
: the configuration of the preprocess function in the same format as the user-config
parameter of the functions create
CLI command.The corresponding fields will be added to SinkConfig
:
private String preprocessFunction; private String preprocessFunctionClassName; private String preprocessFunctionConfig;
The field extraFunctionPackageLocation
to the protobuf structure FunctionMetaData
will be added. This field will be filled with the location of the extra function to apply when registering a sink and used in the Runtime to load the function code.
message FunctionMetaData { ... PackageLocationMetaData extraFunctionPackageLocation = 7; }
The parameters extraFunctionFile
and originalExtraFunctionFileName
will be added to RuntimeFactory::createContainer
Runtime createContainer( InstanceConfig instanceConfig, String codeFile, String originalCodeFileName, String extraFunctionFile, String originalExtraFunctionFileName, Long expectedHealthCheckInterval) throws Exception;
A field extraFunctionId
to InstanceConfig
that will hold the UUID cache key of the extra function will be added.
public class InstanceConfig { private int instanceId; private String functionId; private String extraFunctionId;
The following parameters will be added to JavaInstanceStarter:
--extra_function_jar
: the path to the extra function jar--extra_function_id
: the extra function UUID cache keyThese parameters are then used by the ThreadRuntime
to load the function from the FunctionCacheManager
or create it there if needed.
The statefulset spawned in KubernetesRuntime
needs to be able to download the extra functions code via API. An extra-function
query param will be added to the download function HTTP endpoint
@Path("/{tenant}/{namespace}/{functionName}/download") public StreamingOutput downloadFunction( @ApiParam(value = "The tenant of functions") final @PathParam("tenant") String tenant, @ApiParam(value = "The namespace of functions") final @PathParam("namespace") String namespace, @ApiParam(value = "The name of functions") final @PathParam("functionName") String functionName) { final @PathParam("functionName") String functionName, @ApiParam(value = "Whether to download the extra-function") final @QueryParam("extra-function") boolean extraFunction) {
If extraFunction
is true
then the extra function will be returned instead of the sink.
The Java admin SDK will have the following methods added:
/** * Download Function Code. * * @param destinationFile * file where data should be downloaded to * @param tenant * Tenant name * @param namespace * Namespace name * @param function * Function name * @param extraFunction * Whether to download the extra-function (for sources and sinks) * @throws PulsarAdminException */ void downloadFunction(String destinationFile, String tenant, String namespace, String function, boolean extraFunction) throws PulsarAdminException; /** * Download Function Code asynchronously. * * @param destinationFile * file where data should be downloaded to * @param tenant * Tenant name * @param namespace * Namespace name * @param function * Function name * @param extraFunction * Whether to download the extra-function (for sources and sinks) */ CompletableFuture<Void> downloadFunctionAsync( String destinationFile, String tenant, String namespace, String function, boolean extraFunction);
The parameter --extra-function
will be added to the admin CLI command functions download
On the broker API, in registerSink/updateSink, if a preprocessing function is present in the Sink config, we:
{sink name}__sink-function
.functionDetails
with the preprocessing function config (function class name and function userConfig)The --extra-function
query parameter is added to the functions download
CLI command, admin SDK and HTTP API (see API changes).
When the InstanceConfig
is created, an UUID is set to the extraFunctionId
field. This field will serve as a cache key for the extra function (see API changes).
When the FunctionActioner
starts the function, if extraFunctionPackageLocation
is present, the same is done for the extra function as what is done for the connector:
extraFunctionPackageLocation
and the Runtime
is created with the extra package file path and original name (see API changes to RuntimeFactory::createContainer
)Runtime
is created with the extraFunctionPackageLocation
and original name.Depending on the configured runtime, if there’s an extra function file:
ThreadRuntime
, the extra function classloader is obtained with the instance extraFunctionId
cache key, then this classloader is passed to the JavaInstanceRunnable
. The JavaInstanceRunnable
then switches between the connector classloader and the extra function classloader accordingly..ProcessRuntime
, the path to the extra function jar is added to the --extra_function_jar
parameter in the JavaInstanceStarter
command. The JavaInstanceStarter
then uses it when creating its ThreadRuntime
.KubernetesRuntime
, a command is added in the statefulset exec command to download the extra function using the –extra-function
flag of the functions download
command. And the path to this downloaded jar is added to the --extra_function_jar
parameter of the JavaInstanceStarter
command.If sinkConfig
has a preprocessFunction
, the LocalRunner
will use the same methods as in the broker to get the function file and functionDetails
and use them when spawning the Runtime
.
N/A