docs: blogpost about anomaly detection with ONNX (#160)

* docs: blogpost about anomaly detection with ONNX

* docs: minor improvements
diff --git a/website-v2/blog/2024-03-27-anomaly-detection-with-python-functions.md b/website-v2/blog/2024-03-27-anomaly-detection-with-python-functions.md
new file mode 100644
index 0000000..963e31d
--- /dev/null
+++ b/website-v2/blog/2024-03-27-anomaly-detection-with-python-functions.md
@@ -0,0 +1,269 @@
+---
+title: Anomaly Detection with StreamPipes Functions in Python and ONNX
+author: Tim Bossenmaier
+authorURL: "https://github.com/bossenti"
+authorImageURL: "/img/bossenmaier.png"
+---
+
+
+Apache StreamPipes saves the day when it comes to connecting to data sources in the IIoT world. Want to do more with
+your IIoT data than just analyze it in a dashboard? If so, this blog post is for you! We'll show you how to extract
+historical data from StreamPipes, use it to train a machine learning model, bring the model back to StreamPipes using
+ONNX, and apply the model to live data.
+<center>
+<img class="blog-image" style={{maxWidth:'75%'}} src="/img/blog/2024-03-26/prediction-analysis.png" alt="anomaly-detection"/><br/>
+</center>
+
+<!--truncate-->
+
+## Motivation
+
+With this blogpost we want to illustrate how one can easily extract historical IIoT data collected with StreamPipes,
+train a machine learning model on this data and bringing the model back to StreamPipes with the interoperability
+standard [ONNX](https://onnx.ai) to make inference in live data.
+
+A very common use case in the area of IIoT is the detection of anomalies, so we want to tackle this challenge in this
+article as well. We will use data generated by
+the [Machine Data Simulator](https://streampipes.apache.org/docs/pe/org.apache.streampipes.connect.iiot.adapters.simulator.machine/)
+adapter. More specifically, we will focus on the `flowrate` data, which consists of various sensor values coming from a
+water pipe system. Our goal is to keep an eye on the parameter `volume_flow`, which represents the current volume flow
+in
+cubic meters/second. For this parameter, we want to detect anomalies that could indicate problems such as leaks,
+blockages, etc.
+
+To get the concerned data, we simply need to create an instance of the machine data simulator and persist the data in
+the data lake:
+
+![tutorial-preparation](https://raw.githubusercontent.com/apache/streampipes/dev/streampipes-client-python/docs/img/tutorial-preparation.gif)
+
+## Set Up & Prepare Python Client
+
+As a prerequisite, we need to install the StreamPipes Python client and all other dependencies,
+
+```bash
+pip install git+https://github.com/apache/streampipes.git#subdirectory=streampipes-client-python
+pip install scikit-learn==1.4.0 skl2onnx==1.16.0 onnxruntime==1.17.1
+```
+
+The next step is to configure and initialize an instance of the client.
+
+```python
+import os
+from streampipes.client import StreamPipesClient
+from streampipes.client.config import StreamPipesClientConfig
+from streampipes.client.credential_provider import StreamPipesApiKeyCredentials
+
+os.environ["BROKER-HOST"] = "localhost"
+os.environ["KAFKA-PORT"] = "9094"  # When using Kafka as message broker
+
+config = StreamPipesClientConfig(
+    credential_provider=StreamPipesApiKeyCredentials(
+        username="admin@streampipes.apache.org",
+        api_key="TOKEN",
+    ),
+    host_address="localhost",
+    https_disabled=True,
+    port=80
+)
+
+client = StreamPipesClient(client_config=config)
+```
+
+In case you have never worked with the Python client before and have problems to get started,
+please have a look at
+our [tutorial](https://streampipes.apache.org/docs/docs/python/latest/tutorials/1-introduction-to-streampipes-python-client/).
+
+If you already have an ONNX model and are only interested in applying it with StreamPipes on a data stream, you can skip
+the following section.
+
+## Model Training with Historic Data
+
+As said above, the aim of our model is to detect anomalies of the `volume_flow` parameter. For this task, we will use
+[Isolation Forests](https://en.wikipedia.org/wiki/Isolation_forest). Please note that the focus of the tutorial is not
+on training the model, so please be patient even though the training is very simplified and lacks important preparation
+steps such as standardization.
+
+As a first step, lets query the `flowrate` data from the StreamPipes data lake and extract the values of `volume_flow`
+as a feature:
+
+```python
+flowrate_df = client.dataLakeMeasureApi.get("flow-rate").to_pandas()
+X = flowrate_df["volume_flow"].values.reshape(-1, 1).astype("float32")
+```
+
+As a next step, we can already train our model with the historic data:
+
+```python
+from sklearn.ensemble import IsolationForest
+
+model = IsolationForest(contamination=0.01)
+model.fit(X)
+```
+
+The `contamination` parameter models the proportion of outliers in the data. See
+the [scikit-learn](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.IsolationForest.html)
+documentation for more information.
+
+Here you can see how this simple model performs:
+
+<img src="/img/blog/2024-03-26/prediction-analysis.png"/>
+
+<p></p>
+
+This doesn't look too bad, right? Let's continue by converting our model to the ONNX representation.
+
+```python
+from onnxconverter_common import FloatTensorType
+from skl2onnx import to_onnx
+
+model_onnx = to_onnx(
+    model,
+    initial_types=[('input', FloatTensorType([None, X.shape[1]]))],
+    target_opset={'ai.onnx.ml': 3, 'ai.onnx': 15, '': 15}
+)
+
+with open("isolation_forest.onnx", "wb") as f:
+    f.write(model_onnx.SerializeToString())
+```
+
+## Model Inference with Live Data
+
+Utilizing a pre-trained model within StreamPipes becomes seamless with the ONNX interoperability standard, enabling
+effortless application of your existing model on live data streams.
+
+Interacting with live data from StreamPipes is facilitated through StreamPipes functions. Below, we'll create a Python
+StreamPipes function that leverages an ONNX model to generate predictions for each incoming event, making the results
+accessible as a data stream within StreamPipes for subsequent steps.
+
+So let's create an `ONNXFunction` that is capable of applying a model in ONNX representation to a StreamPipes data
+stream.
+If you'd like to read more details about how functions are defined, refer
+to [our tutorial](https://streampipes.apache.org/docs/docs/python/latest/tutorials/3-getting-live-data-from-the-streampipes-data-stream/).
+
+```python
+import numpy as np
+import onnxruntime as rt
+
+from streampipes.functions.broker.broker_handler import get_broker_description
+from streampipes.functions.streampipes_function import StreamPipesFunction
+from streampipes.functions.utils.data_stream_generator import create_data_stream, RuntimeType
+from streampipes.functions.utils.function_context import FunctionContext
+from streampipes.model.resource import FunctionDefinition, DataStream
+
+from typing import Dict, Any, List
+
+
+class ONNXFunction(StreamPipesFunction):
+
+    def __init__(self, feature_names: list[str], input_stream: DataStream):
+        output_stream = create_data_stream(
+            name="flowrate-prediction",
+            attributes={
+                "is_anomaly": RuntimeType.BOOLEAN.value
+            },
+            broker=get_broker_description(input_stream)
+        )
+
+        function_definition = FunctionDefinition(
+            consumed_streams=[input_stream.element_id]
+        ).add_output_data_stream(output_stream)
+
+        self.feature_names = feature_names
+        self.input_name = None
+        self.output_name = None
+        self.session = None
+
+        super().__init__(function_definition=function_definition)
+    
+    ...
+```
+
+First, we need to take care about the data stream that is required to send the predictions from our function to
+StreamPipes. Thus, we create a dedicated output data stream which we need to provide with the attributes our event will
+consist of (a timestamp attribute is always added automatically). This output data stream needs to be registered at the
+function definition which is to be passed to the parent class. Lastly, we need to define some instance variables that
+are mainly required for the ONNX runtime.
+
+Next, we need to ensure that ONNX runtime session is created on start up. Thus, we need to invoke an InferenceSession
+and retrieving the corresponding configuration parameters:
+
+```python
+class ONNXFunction(StreamPipesFunction):
+  
+    ...
+    
+    def onServiceStarted(self, context: FunctionContext) -> None:
+        self.session = rt.InferenceSession(
+            path_or_bytes="isolation_forest.onnx",
+            providers=rt.get_available_providers(),
+        )
+        self.input_name = self.session.get_inputs()[0].name
+        self.output_name = self.session.get_outputs()[0].name
+        
+    ...
+
+```
+
+Lastly, we need to implement the inference logic that is applied to every event.
+If you have brought up your own model, you need to adapt line `10-13`:
+
+```python jsx {10-13} showLineNumbers
+class ONNXFunction(StreamPipesFunction):
+
+    ...
+
+    def onEvent(self, event: Dict[str, Any], streamId: str) -> None:
+        feature_vector = []
+        for feature in self.feature_names:
+            feature_vector.append(event[feature])
+
+        prediction = self.session.run(
+            [self.output_name],
+            {self.input_name: np.expand_dims(np.array(feature_vector), axis=0).astype("float32")}
+        )[0]
+
+        output = {
+            "is_anomaly": int(prediction[0]) == -1
+        }
+
+        self.add_output(
+            stream_id=self.function_definition.get_output_stream_ids()[0],
+            event=output
+        )
+
+    def onServiceStopped(self) -> None:
+        pass
+```
+
+Having the function code in place, we can start the function with the following:
+
+```python
+from streampipes.functions.registration import Registration
+from streampipes.functions.function_handler import FunctionHandler
+
+stream = [
+    stream
+    for stream
+    in client.dataStreamApi.all()
+    if stream.name == "flow-rate"
+][0]
+
+function = ONNXFunction(
+    feature_names=["volume_flow"],
+    input_stream=stream
+)
+
+registration = Registration()
+registration.register(function)
+function_handler = FunctionHandler(registration, client)
+function_handler.initializeFunctions()
+```
+
+We can now access the live values of the prediction in the StreamPipes UI, e.g., in the pipeline editor.
+
+<img src="/img/blog/2024-03-26/tutorial-prediction-data-stream.png"/>
+
+<p></p>
+
+From here on you can further work with the prediction events in StreamPipes, e.g., by sending notifications
+to [MS Teams](https://streampipes.apache.org/docs/next/pe/org.apache.streampipes.sinks.notifications.jvm.msteams/).
diff --git a/website-v2/static/img/blog/2024-03-26/prediction-analysis.png b/website-v2/static/img/blog/2024-03-26/prediction-analysis.png
new file mode 100644
index 0000000..e442bda
--- /dev/null
+++ b/website-v2/static/img/blog/2024-03-26/prediction-analysis.png
Binary files differ
diff --git a/website-v2/static/img/blog/2024-03-26/tutorial-prediction-data-stream.png b/website-v2/static/img/blog/2024-03-26/tutorial-prediction-data-stream.png
new file mode 100644
index 0000000..071a5f2
--- /dev/null
+++ b/website-v2/static/img/blog/2024-03-26/tutorial-prediction-data-stream.png
Binary files differ