blob: 659690b71d8e79b19ff8e36c1ddcf2426f4e66f1 [file] [log] [blame] [view]
---
id: extend-sdk-functions
title: "SDK Guide: Functions"
sidebar_label: "SDK: Functions"
---
## Introduction
Pipeline elements such as data processors and data sinks are a great way
to create _reusable_ components that can be part of pipelines.
However, creating a pipeline element is not always the best choice:
* The behaviour of a data processor is bound to a specific input stream _and_
* A data processor doesn't contain any user-defined configuration _and_
* The intended action is fixed or known at build time and the data processor shouldn't be available in the pipeline editor.
To cover such use cases, we provide _StreamPipes Functions_. Functions
are a great way to define custom processing logic based on previously
connected data streams.
Functions can be registered in a similar way to pipeline elements, but define expected input
streams at startup time. Functions are started once the corresponding _extensions service_ starts
and run until the service is stopped.
## Writing a function
:::caution Work in Progress
Functions are currently in preview mode and are not yet recommended for production usage.
APIs are subject to change in a future version.
:::
To define a function, create a new extensions service using the [Maven Archetypes](06_extend-archetypes.md) or use an already existing service.
### Skeleton
Functions can be defined by creating a new class which extends the ``StreamPipesFunction`` class.
The basic skeleton looks like this:
```java
public class StreamPipesFunctionExample extends StreamPipesFunction {
@Override
public FunctionId getFunctionId() {
return FunctionId.from("my-function-id", 1);
}
@Override
public List<String> requiredStreamIds() {
return List.of("<id of the required stream>");
}
@Override
public void onServiceStarted(FunctionContext context) {
// called when the service is started
}
@Override
public void onEvent(Event event, String streamId) {
// called when an event arrives
}
@Override
public void onServiceStopped() {
// called when the service is stopped
}
}
```
The structure of a function class is easy to understand:
* _getFunctionId_ requires an identifier in form of a ``FunctionId``, which defines the id itself along with a version number that can be freely chosen.
* _requiredStreamIds_ expects a list of references to data streams that are already available in StreamPipes. See below to learn how to find the id of a stream in StreamPipes.
* _onServiceStarted_ is called once the extensions service is started and can be used to initialize the function.
* _onEvent_ is called every time a new event arrives and provides a ``streamId`` as a reference to the corresponding stream, which is useful in case multiple data streams are received by the function.
* _onServiceStopped_ is called when the extensions service is stopped and can be used to perform any required cleanup.
### Getting a stream ID
Functions require a reference to all data streams that should be retrieved by the function.
Currently, the only way to get the ID of a function is by navigating to the ``Asset Management`` view in the StreamPipes UI.
Create a new asset, click on ``Edit Asset`` and open ``Add Link`` in the _Linked Resources_ panel.
Choose ``Data Source`` as link type, select one of the available sources, copy the ``Resource ID`` and provide this ID in the ``requiredStreamIds`` method.
### Function Context
The ``onServiceStarted`` method provides a function context which provides several convenience methods to work with functions:
* _getFunctionId_ returns the current function identifier
* _getConfig_ returns a reference to configuration options of the extensions service
* _getClient_ returns a reference to the StreamPipes client to interact with features from the REST API.
* _getStreams_ returns the data model of all data streams defined in the ``requiredStreamIds`` method.
* _getSchema_ returns the schema of a specific data stream by providing the ``streamId``
## Registering a function
Registering a function is easy and can be done in the _Init_ class of the service.
E.g., considering a service definition as illustrated below, simply call ``registerFunction`` and
provide an instance of your function.
```java
@Override
public SpServiceDefinition provideServiceDefinition() {
return SpServiceDefinitionBuilder.create("my-service-id",
"StreamPipes Function Example",
"",
8090)
.registerFunction(new MyExampleFunction())
.registerMessagingFormats(
new JsonDataFormatFactory())
.registerMessagingProtocols(
new SpNatsProtocolFactory())
.build();
}
```
## Metrics & Monitoring
Similar to pipeline elements, function register at the StreamPipes core.
Running functions can be seen in the pipeline view of the user interface under _Functions_, right below the list of available pipelines.
Similar to pipelines, simple metrics, monitoring info and exceptions can be viewed in the _Details_ section of each function.