Pipeline elements such as data processors and data sinks are a great way to create reusable components that can be part of pipelines. However, creating a pipeline element is not always the best choice:
To cover such use cases, we provide StreamPipes Functions. Functions are a great way to define custom processing logic based on previously connected data streams.
Functions can be registered in a similar way to pipeline elements, but define expected input streams at startup time. Functions are started once the corresponding extensions service starts and run until the service is stopped.
To define a function, create a new extensions service using the Maven Archetypes or use an already existing service.
Functions can be defined by creating a new class which extends the StreamPipesFunction
class.
The basic skeleton looks like this:
public class StreamPipesFunctionExample extends StreamPipesFunction { @Override public FunctionId getFunctionId() { return FunctionId.from("my-function-id", 1); } @Override public List<String> requiredStreamIds() { return List.of("<id of the required stream>"); } @Override public void onServiceStarted(FunctionContext context) { // called when the service is started } @Override public void onEvent(Event event, String streamId) { // called when an event arrives } @Override public void onServiceStopped() { // called when the service is stopped } }
The structure of a function class is easy to understand:
FunctionId
, which defines the id itself along with a version number that can be freely chosen.streamId
as a reference to the corresponding stream, which is useful in case multiple data streams are received by the function.Functions require a reference to all data streams that should be retrieved by the function. Currently, the only way to get the ID of a function is by navigating to the Asset Management
view in the StreamPipes UI. Create a new asset, click on Edit Asset
and open Add Link
in the Linked Resources panel. Choose Data Source
as link type, select one of the available sources, copy the Resource ID
and provide this ID in the requiredStreamIds
method.
The onServiceStarted
method provides a function context which provides several convenience methods to work with functions:
requiredStreamIds
method.streamId
Registering a function is easy and can be done in the Init class of the service. E.g., considering a service definition as illustrated below, simply call registerFunction
and provide an instance of your function.
@Override public SpServiceDefinition provideServiceDefinition() { return SpServiceDefinitionBuilder.create("my-service-id", "StreamPipes Function Example", "", 8090) .registerFunction(new MyExampleFunction()) .registerMessagingFormats( new JsonDataFormatFactory()) .registerMessagingProtocols( new SpNatsProtocolFactory()) .build(); }
Similar to pipeline elements, function register at the StreamPipes core. Running functions can be seen in the pipeline view of the user interface under Functions, right below the list of available pipelines. Similar to pipelines, simple metrics, monitoring info and exceptions can be viewed in the Details section of each function.