| { |
| "cells": [ |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "# Using Online Machine Learning on a StreamPipes data stream\n", |
| "The last tutorial ([Getting live data from the StreamPipes data stream](../3-getting-live-data-from-the-streampipes-data-stream)) showed how we can connect to a data stream, and it would be possible to use Online Machine Learning with this approach and train a model with the incoming events at the `onEvent` method. However, the StreamPipes client also provides an easier way to do this with the use of the [River library](https://riverml.xyz) for Online Machine Learning. We will have a look at this now." |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 1, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "from streampipes.client import StreamPipesClient\n", |
| "from streampipes.client.config import StreamPipesClientConfig\n", |
| "from streampipes.client.credential_provider import StreamPipesApiKeyCredentials" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "outputs": [], |
| "source": [ |
| "# you can install all required dependecies for this tutorial by executing the following command\n", |
| "%pip install river streampipes" |
| ], |
| "metadata": { |
| "collapsed": false |
| } |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 2, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "import os\n", |
| "\n", |
| "os.environ[\"USER\"] = \"admin@streampipes.apache.org\"\n", |
| "os.environ[\"API-KEY\"] = \"XXX\"\n", |
| "os.environ[\"BROKER-HOST\"] = \"localhost\"" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 3, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "2023-01-27 16:04:24,784 - streampipes.client.client - [INFO] - [client.py:128] [_set_up_logging] - Logging successfully initialized with logging level INFO.\n" |
| ] |
| } |
| ], |
| "source": [ |
| "client_config = StreamPipesClientConfig(\n", |
| " credential_provider=StreamPipesApiKeyCredentials.from_env(username_env=\"USER\", api_key_env=\"API-KEY\"),\n", |
| " host_address=\"localhost\",\n", |
| " port=80,\n", |
| " https_disabled=True,\n", |
| ")\n", |
| "client = StreamPipesClient(client_config=client_config)" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 4, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "2023-01-27 16:04:28,212 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:163] [_make_request] - Successfully retrieved all resources.\n" |
| ] |
| }, |
| { |
| "data": { |
| "text/html": [ |
| "<div>\n", |
| "<style scoped>\n", |
| " .dataframe tbody tr th:only-of-type {\n", |
| " vertical-align: middle;\n", |
| " }\n", |
| "\n", |
| " .dataframe tbody tr th {\n", |
| " vertical-align: top;\n", |
| " }\n", |
| "\n", |
| " .dataframe thead th {\n", |
| " text-align: right;\n", |
| " }\n", |
| "</style>\n", |
| "<table border=\"1\" class=\"dataframe\">\n", |
| " <thead>\n", |
| " <tr style=\"text-align: right;\">\n", |
| " <th></th>\n", |
| " <th>element_id</th>\n", |
| " <th>name</th>\n", |
| " <th>description</th>\n", |
| " <th>icon_url</th>\n", |
| " <th>app_id</th>\n", |
| " <th>includes_assets</th>\n", |
| " <th>includes_locales</th>\n", |
| " <th>internally_managed</th>\n", |
| " <th>measurement_object</th>\n", |
| " <th>index</th>\n", |
| " <th>...</th>\n", |
| " <th>dom</th>\n", |
| " <th>rev</th>\n", |
| " <th>num_transport_protocols</th>\n", |
| " <th>num_measurement_capability</th>\n", |
| " <th>num_application_links</th>\n", |
| " <th>num_included_assets</th>\n", |
| " <th>num_connected_to</th>\n", |
| " <th>num_category</th>\n", |
| " <th>num_event_properties</th>\n", |
| " <th>num_included_locales</th>\n", |
| " </tr>\n", |
| " </thead>\n", |
| " <tbody>\n", |
| " <tr>\n", |
| " <th>0</th>\n", |
| " <td>sp:spdatastream:xboBFK</td>\n", |
| " <td>Test</td>\n", |
| " <td></td>\n", |
| " <td>None</td>\n", |
| " <td>None</td>\n", |
| " <td>False</td>\n", |
| " <td>False</td>\n", |
| " <td>True</td>\n", |
| " <td>None</td>\n", |
| " <td>0</td>\n", |
| " <td>...</td>\n", |
| " <td>None</td>\n", |
| " <td>5-558c861debc745e1ebae29a266a8bdb9</td>\n", |
| " <td>1</td>\n", |
| " <td>0</td>\n", |
| " <td>0</td>\n", |
| " <td>0</td>\n", |
| " <td>0</td>\n", |
| " <td>0</td>\n", |
| " <td>7</td>\n", |
| " <td>0</td>\n", |
| " </tr>\n", |
| " <tr>\n", |
| " <th>1</th>\n", |
| " <td>urn:streampipes.apache.org:eventstream:Wgyrse</td>\n", |
| " <td>Test File</td>\n", |
| " <td></td>\n", |
| " <td>None</td>\n", |
| " <td>None</td>\n", |
| " <td>False</td>\n", |
| " <td>False</td>\n", |
| " <td>True</td>\n", |
| " <td>None</td>\n", |
| " <td>0</td>\n", |
| " <td>...</td>\n", |
| " <td>None</td>\n", |
| " <td>4-66548b6b84287011b7cec0876ef82baf</td>\n", |
| " <td>1</td>\n", |
| " <td>0</td>\n", |
| " <td>0</td>\n", |
| " <td>0</td>\n", |
| " <td>0</td>\n", |
| " <td>0</td>\n", |
| " <td>2</td>\n", |
| " <td>0</td>\n", |
| " </tr>\n", |
| " </tbody>\n", |
| "</table>\n", |
| "<p>2 rows × 22 columns</p>\n", |
| "</div>" |
| ], |
| "text/plain": [ |
| " element_id name description \\\n", |
| "0 sp:spdatastream:xboBFK Test \n", |
| "1 urn:streampipes.apache.org:eventstream:Wgyrse Test File \n", |
| "\n", |
| " icon_url app_id includes_assets includes_locales internally_managed \\\n", |
| "0 None None False False True \n", |
| "1 None None False False True \n", |
| "\n", |
| " measurement_object index ... dom rev \\\n", |
| "0 None 0 ... None 5-558c861debc745e1ebae29a266a8bdb9 \n", |
| "1 None 0 ... None 4-66548b6b84287011b7cec0876ef82baf \n", |
| "\n", |
| " num_transport_protocols num_measurement_capability num_application_links \\\n", |
| "0 1 0 0 \n", |
| "1 1 0 0 \n", |
| "\n", |
| " num_included_assets num_connected_to num_category num_event_properties \\\n", |
| "0 0 0 0 7 \n", |
| "1 0 0 0 2 \n", |
| "\n", |
| " num_included_locales \n", |
| "0 0 \n", |
| "1 0 \n", |
| "\n", |
| "[2 rows x 22 columns]" |
| ] |
| }, |
| "execution_count": 4, |
| "metadata": {}, |
| "output_type": "execute_result" |
| } |
| ], |
| "source": [ |
| "client.dataStreamApi.all().to_pandas()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## How to use Online Machine Learning with StreamPipes\n", |
| "After we configured the client as usual, we can start with the new part. \n", |
| "The approach is straight forward and you can start with the ML part in just 3 steps:\n", |
| "1. Create a pipeline with River and insert the preprocessing steps and model of your choice.\n", |
| "2. Configure the `OnlineML` wrapper to fit to your model and insert the client and required data stream ids.\n", |
| "3. Start the wrapper and let the learning begin.\n", |
| "\n", |
| "A StreamPipesFunction is then started, which trains the model for each new event. It also creates an output data stream which will send the prediction of the model back to StreamPipes. This output stream can be seen when creating a new pipeline and can be used like every other data source. So you can use it in a pipeline and save the predictions in a Data Lake.\n", |
| "You can also stop and start the training with the method `set_learning`. To stop the whole function use the `stop` methode and if you want to delete the output stream entirely, you can go to the `Pipeline Element Installer` in StreamPipes and uninstall it. \n", |
| " \n", |
| "Now let's take a look at some examples. If you want to execute the examples below you have to create an adapter for the `Machine Data Simulator`, select the `flowrate` sensor and insert the stream id of this stream." |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## KMeans" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 5, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "2023-01-27 16:04:35,599 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:163] [_make_request] - Successfully retrieved all resources.\n", |
| "2023-01-27 16:04:35,599 - streampipes.functions.function_handler - [INFO] - [function_handler.py:64] [initializeFunctions] - Create output data stream \"sp:spdatastream:cwKPoo\" for the function \"65cf8b86-bcdf-433e-a1c7-3e920eab55d0\"\n", |
| "2023-01-27 16:04:37,766 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:163] [_make_request] - Successfully retrieved all resources.\n", |
| "2023-01-27 16:04:37,767 - streampipes.functions.function_handler - [INFO] - [function_handler.py:78] [initializeFunctions] - Using NatsBroker for RiverFunction\n" |
| ] |
| }, |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "2023-01-27 16:04:37,791 - streampipes.functions.broker.nats_broker - [INFO] - [nats_broker.py:48] [_makeConnection] - Connected to NATS at localhost:4222\n", |
| "2023-01-27 16:04:37,791 - streampipes.functions.broker.nats_broker - [INFO] - [nats_broker.py:48] [_makeConnection] - Connected to NATS at localhost:4222\n", |
| "2023-01-27 16:04:37,792 - streampipes.functions.broker.nats_broker - [INFO] - [nats_broker.py:58] [createSubscription] - Subscribed to stream: sp:spdatastream:xboBFK\n" |
| ] |
| } |
| ], |
| "source": [ |
| "from river import cluster, compose, preprocessing\n", |
| "from streampipes.function_zoo.river_function import OnlineML\n", |
| "from streampipes.functions.utils.data_stream_generator import RuntimeType\n", |
| "\n", |
| "k_means = compose.Pipeline(\n", |
| " (\"drop_features\", compose.Discard(\"sensorId\", \"timestamp\")),\n", |
| " (\"scale\", preprocessing.StandardScaler()),\n", |
| " (\"k_means\", cluster.KMeans(n_clusters=2)),\n", |
| ")\n", |
| "\n", |
| "clustering = OnlineML(\n", |
| " client=client, stream_ids=[\"sp:spdatastream:xboBFK\"], model=k_means, prediction_type=RuntimeType.INTEGER.value\n", |
| ")\n", |
| "clustering.start()" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 6, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "clustering.set_learning(False)" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [ |
| { |
| "name": "stdout", |
| "output_type": "stream", |
| "text": [ |
| "2023-01-27 16:04:57,303 - streampipes.functions.broker.nats_broker - [INFO] - [nats_broker.py:82] [disconnect] - Stopped connection to stream: sp:spdatastream:xboBFK\n", |
| "2023-01-27 16:04:57,304 - streampipes.functions.broker.nats_broker - [INFO] - [nats_broker.py:82] [disconnect] - Stopped connection to stream: sp:spdatastream:cwKPoo\n" |
| ] |
| } |
| ], |
| "source": [ |
| "clustering.stop()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## HoeffdingTreeRegressor" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "from river import cluster, compose, preprocessing, tree\n", |
| "from streampipes.function_zoo.river_function import OnlineML\n", |
| "from streampipes.functions.utils.data_stream_generator import RuntimeType\n", |
| "\n", |
| "hoeffding_tree = compose.Pipeline(\n", |
| " (\"drop_features\", compose.Discard(\"sensorId\", \"timestamp\")),\n", |
| " (\"hoeffding_tree\", tree.HoeffdingTreeRegressor(grace_period=5)),\n", |
| ")\n", |
| "\n", |
| "\n", |
| "def draw_tree(self, event, streamId):\n", |
| " \"\"\"Draw the tree and save the image.\"\"\"\n", |
| " if self.learning:\n", |
| " if self.model[1].n_nodes != None:\n", |
| " self.model[1].draw().render(\"hoeffding_tree\", format=\"png\", cleanup=True)\n", |
| "\n", |
| "\n", |
| "def save_model(self):\n", |
| " \"\"\"Save the trained model.\"\"\"\n", |
| " with open(\"hoeffding_tree.pkl\", \"wb\") as f:\n", |
| " pickle.dump(self.model, f)\n", |
| "\n", |
| "\n", |
| "regressor = OnlineML(\n", |
| " client=client,\n", |
| " stream_ids=[\"sp:spdatastream:xboBFK\"],\n", |
| " model=hoeffding_tree,\n", |
| " prediction_type=RuntimeType.FLOAT.value,\n", |
| " supervised=True,\n", |
| " target_label=\"temperature\",\n", |
| " on_event=draw_tree,\n", |
| " on_stop=save_model,\n", |
| ")\n", |
| "regressor.start()" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 9, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "regressor.set_learning(False)" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "regressor.stop()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "## DecisionTreeClassifier" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "import pickle\n", |
| "from river import cluster, compose, preprocessing, tree\n", |
| "from streampipes.function_zoo.river_function import OnlineML\n", |
| "from streampipes.functions.utils.data_stream_generator import RuntimeType\n", |
| "\n", |
| "decision_tree = compose.Pipeline(\n", |
| " (\"drop_features\", compose.Discard(\"sensorId\", \"timestamp\")),\n", |
| " (\"decision_tree\", tree.ExtremelyFastDecisionTreeClassifier(grace_period=5)),\n", |
| ")\n", |
| "\n", |
| "\n", |
| "def draw_tree(self, event, streamId):\n", |
| " \"\"\"Draw the tree and save the image.\"\"\"\n", |
| " if self.learning:\n", |
| " if self.model[1].n_nodes != None:\n", |
| " self.model[1].draw().render(\"decicion_tree\", format=\"png\", cleanup=True)\n", |
| "\n", |
| "\n", |
| "def save_model(self):\n", |
| " \"\"\"Save the trained model.\"\"\"\n", |
| " with open(\"decision_tree.pkl\", \"wb\") as f:\n", |
| " pickle.dump(self.model, f)\n", |
| "\n", |
| "\n", |
| "classifier = OnlineML(\n", |
| " client=client,\n", |
| " stream_ids=[\"sp:spdatastream:xboBFK\"],\n", |
| " model=decision_tree,\n", |
| " prediction_type=RuntimeType.BOOLEAN.value,\n", |
| " supervised=True,\n", |
| " target_label=\"sensor_fault_flags\",\n", |
| " on_event=draw_tree,\n", |
| " on_stop=save_model,\n", |
| ")\n", |
| "classifier.start()" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": 12, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "classifier.set_learning(False)" |
| ] |
| }, |
| { |
| "cell_type": "code", |
| "execution_count": null, |
| "metadata": {}, |
| "outputs": [], |
| "source": [ |
| "classifier.stop()" |
| ] |
| }, |
| { |
| "attachments": {}, |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [ |
| "How do you like this tutorial?\n", |
| "We hope you like it and would love to receive some feedback from you.\n", |
| "Just go to our [GitHub discussion page](https://github.com/apache/streampipes/discussions) and let us know your impression.\n", |
| "We'll read and react to them all, we promise!" |
| ] |
| }, |
| { |
| "cell_type": "markdown", |
| "metadata": {}, |
| "source": [] |
| } |
| ], |
| "metadata": { |
| "kernelspec": { |
| "display_name": "Python 3", |
| "language": "python", |
| "name": "python3" |
| }, |
| "language_info": { |
| "codemirror_mode": { |
| "name": "ipython", |
| "version": 3 |
| }, |
| "file_extension": ".py", |
| "mimetype": "text/x-python", |
| "name": "python", |
| "nbconvert_exporter": "python", |
| "pygments_lexer": "ipython3", |
| "version": "3.10.7" |
| }, |
| "license": "https://www.apache.org/licenses/LICENSE-2.0", |
| "orig_nbformat": 4, |
| "vscode": { |
| "interpreter": { |
| "hash": "5b0c980f854fedc946bf92df903f8f9d5dc7805e8fffb7a1ceb21db6f8b983e3" |
| } |
| } |
| }, |
| "nbformat": 4, |
| "nbformat_minor": 2 |
| } |