blob: f99ae1c549c73e1cad51c86373f8d0d7f242e3dd [file] [log] [blame]
<!doctype html>
<html lang="en" dir="ltr" class="blog-wrapper blog-post-page plugin-blog plugin-id-default">
<head>
<meta charset="UTF-8">
<meta name="generator" content="Docusaurus v2.4.1">
<title data-rh="true">Anomaly Detection with StreamPipes Functions in Python and ONNX | Apache StreamPipes</title><meta data-rh="true" name="viewport" content="width=device-width,initial-scale=1"><meta data-rh="true" name="twitter:card" content="summary_large_image"><meta data-rh="true" property="og:image" content="https://streampipes.apache.org/img/favicon.png"><meta data-rh="true" name="twitter:image" content="https://streampipes.apache.org/img/favicon.png"><meta data-rh="true" property="og:url" content="https://streampipes.apache.org/blog/2024/03/27/anomaly-detection-with-python-functions/"><meta data-rh="true" name="docusaurus_locale" content="en"><meta data-rh="true" name="docusaurus_tag" content="default"><meta data-rh="true" name="docsearch:language" content="en"><meta data-rh="true" name="docsearch:docusaurus_tag" content="default"><meta data-rh="true" property="og:title" content="Anomaly Detection with StreamPipes Functions in Python and ONNX | Apache StreamPipes"><meta data-rh="true" name="description" content="Apache StreamPipes saves the day when it comes to connecting to data sources in the IIoT world. Want to do more with"><meta data-rh="true" property="og:description" content="Apache StreamPipes saves the day when it comes to connecting to data sources in the IIoT world. Want to do more with"><meta data-rh="true" property="og:type" content="article"><meta data-rh="true" property="article:published_time" content="2024-03-27T00:00:00.000Z"><meta data-rh="true" property="article:author" content="https://github.com/bossenti"><link data-rh="true" rel="icon" href="/img/favicon.png"><link data-rh="true" rel="canonical" href="https://streampipes.apache.org/blog/2024/03/27/anomaly-detection-with-python-functions/"><link data-rh="true" rel="alternate" href="https://streampipes.apache.org/blog/2024/03/27/anomaly-detection-with-python-functions/" hreflang="en"><link data-rh="true" rel="alternate" href="https://streampipes.apache.org/blog/2024/03/27/anomaly-detection-with-python-functions/" hreflang="x-default"><link rel="alternate" type="application/rss+xml" href="/blog/rss.xml" title="Apache StreamPipes RSS Feed">
<link rel="alternate" type="application/atom+xml" href="/blog/atom.xml" title="Apache StreamPipes Atom Feed">
<link rel="preconnect" href="//analytics.apache.org/">
<script>var _paq=window._paq=window._paq||[];_paq.push(["disableCookies"]),_paq.push(["trackPageView"]),_paq.push(["enableLinkTracking"]),function(){var a="//analytics.apache.org/";_paq.push(["setTrackerUrl",a+"matomo.php"]),_paq.push(["setSiteId","35"]);var e=document,p=e.createElement("script"),t=e.getElementsByTagName("script")[0];p.type="text/javascript",p.async=!0,p.src=a+"matomo.js",t.parentNode.insertBefore(p,t)}()</script>
<link rel="stylesheet" href="/css/slick.min.css">
<link rel="stylesheet" href="/css/slick-theme.min.css">
<link rel="stylesheet" href="/css/fonts.css">
<link rel="stylesheet" href="/css/admonition.css">
<link rel="stylesheet" href="/css/custom.css">
<link rel="stylesheet" href="/css/code-block-buttons.css">
<link rel="stylesheet" href="/css/all.min.css">
<link rel="stylesheet" href="/css/bootstrap.min.css">
<link rel="stylesheet" href="/css/buttons.css">
<link rel="stylesheet" href="/css/team.css">
<link rel="stylesheet" href="/css/custom-website.css">
<link rel="stylesheet" href="/css/page-banner.css">
<link rel="stylesheet" href="/css/responsive.css">
<script src="/js/jquery-3.3.1.min.js"></script>
<script src="/js/buttons.js"></script>
<script src="/js/slick.min.js"></script>
<script src="/js/custom.js"></script>
<script src="/js/clipboard.min.js"></script>
<script src="/js/code-block-buttons.js"></script><link rel="stylesheet" href="/assets/css/styles.a4107cef.css">
<link rel="preload" href="/assets/js/runtime~main.d38cef4e.js" as="script">
<link rel="preload" href="/assets/js/main.6ee83347.js" as="script">
</head>
<body class="navigation-with-keyboard">
<script>!function(){function t(t){document.documentElement.setAttribute("data-theme",t)}var e=function(){var t=null;try{t=new URLSearchParams(window.location.search).get("docusaurus-theme")}catch(t){}return t}()||function(){var t=null;try{t=localStorage.getItem("theme")}catch(t){}return t}();t(null!==e?e:"light")}(),document.documentElement.setAttribute("data-announcement-bar-initially-dismissed",function(){try{return"true"===localStorage.getItem("docusaurus.announcement.dismiss")}catch(t){}return!1}())</script><div id="__docusaurus">
<div role="region" aria-label="Skip to main content"><a class="skipToContent_fXgn" href="#__docusaurus_skipToContent_fallback">Skip to main content</a></div><div class="announcementBar_mb4j" style="background-color:var(--color-accent);color:white" role="banner"><div class="content_knG7 announcementBarContent_xLdY">Apache StreamPipes 0.97.0 is available! ⭐️</div></div><nav aria-label="Main" class="navbar navbar--fixed-top"><div class="navbar__inner"><div class="navbar__items"><button aria-label="Toggle navigation bar" aria-expanded="false" class="navbar__toggle clean-btn" type="button"><svg width="30" height="30" viewBox="0 0 30 30" aria-hidden="true"><path stroke="currentColor" stroke-linecap="round" stroke-miterlimit="10" stroke-width="2" d="M4 7h22M4 15h22M4 23h22"></path></svg></button><a class="navbar__brand" href="/"><div class="navbar__logo"><img src="/img/sp-logo-color.png" alt="Apache StreamPipes" class="themedImage_ToTc themedImage--light_HNdA"><img src="/img/sp-logo-color.png" alt="Apache StreamPipes" class="themedImage_ToTc themedImage--dark_i4oU"></div></a></div><div class="navbar__items navbar__items--right"><a aria-current="page" class="navbar__item navbar__link navbar__link--active" href="/">Home</a><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">Docs</a><ul class="dropdown__menu"><li><a class="dropdown__link" href="/docs/user-guide-introduction/">Apache StreamPipes - User Guide</a></li><li><a href="https://streampipes.apache.org/docs/docs/python/latest/" target="_blank" rel="noopener noreferrer" class="dropdown__link">Apache StreamPipes - Python</a></li><li><a class="dropdown__link" href="/docs/faq-common-problems/">FAQ</a></li></ul></div><a class="navbar__item navbar__link" href="/download/">Download</a><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">Resources</a><ul class="dropdown__menu"><li><a href="https://github.com/apache/streampipes/issues" target="_blank" rel="noopener noreferrer" class="dropdown__link">Issue Tracker<svg width="12" height="12" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li><a href="https://github.com/apache/streampipes/discussions" target="_blank" rel="noopener noreferrer" class="dropdown__link">Github Discussions Support<svg width="12" height="12" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li><a href="http://cwiki.apache.org/confluence/display/STREAMPIPES" target="_blank" rel="noopener noreferrer" class="dropdown__link">Developer wiki<svg width="12" height="12" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li><a class="dropdown__link" href="/resources/slides-videos/">Slides &amp; Videos</a></li></ul></div><a aria-current="page" class="navbar__item navbar__link navbar__link--active" href="/blog/">Blog</a><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">Community</a><ul class="dropdown__menu"><li><a class="dropdown__link" href="/community/mailing-lists/">Mailing Lists</a></li><li><a class="dropdown__link" href="/community/team/">Team</a></li><li><a class="dropdown__link" href="/community/get-involved/">Get involved</a></li><li><a class="dropdown__link" href="/community/talks-events/">Talks &amp; Events</a></li></ul></div><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">Apache</a><ul class="dropdown__menu"><li><a href="https://www.apache.org/" target="_blank" rel="noopener noreferrer" class="dropdown__link">Apache Software Foundation<svg width="12" height="12" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li><a href="https://www.apache.org/foundation/how-it-works.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">How Apache Works<svg width="12" height="12" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li><a href="https://www.apache.org/licenses" target="_blank" rel="noopener noreferrer" class="dropdown__link">License<svg width="12" height="12" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li><a href="https://www.apache.org/security" target="_blank" rel="noopener noreferrer" class="dropdown__link">Security<svg width="12" height="12" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li><a href="https://www.apache.org/foundation/sponsorship.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">Sponsoring Apache<svg width="12" height="12" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li><a href="https://www.apache.org/foundation/thanks.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">Thanks<svg width="12" height="12" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li></ul></div><a href="https://github.com/apache/streampipes" target="_blank" rel="noopener noreferrer" class="navbar__item navbar__link header-link h-github" aria-label="Apache StreamPipes Github"></a><a href="https://linkedin.com/company/apache-streampipes" target="_blank" rel="noopener noreferrer" class="navbar__item navbar__link header-link h-linkedin" aria-label="Apache StreamPipes LinkedIn"></a><a href="https://twitter.com/streampipes" target="_blank" rel="noopener noreferrer" class="navbar__item navbar__link header-link h-twitter" aria-label="Apache StreamPipes Twitter"></a><div class="searchBox_ZlJk"></div></div></div><div role="presentation" class="navbar-sidebar__backdrop"></div></nav><div id="__docusaurus_skipToContent_fallback" class="main-wrapper mainWrapper_z2l0"><div class="container margin-vert--lg"><div class="row"><aside class="col col--3"><nav class="sidebar_re4s thin-scrollbar" aria-label="Blog recent posts navigation"><div class="sidebarItemTitle_pO2u margin-bottom--md">Recent posts</div><ul class="sidebarItemList_Yudw clean-list"><li class="sidebarItem__DBe"><a class="sidebarItemLink_mo7H" href="/blog/2025/06/11/oi4-adapter/">From Sensor to Insight - Rapid IIoT Integration for IO-Link Devices with Apache StreamPipes</a></li><li class="sidebarItem__DBe"><a class="sidebarItemLink_mo7H" href="/blog/2025/05/20/opc-ua-certificates/">Secure OPC-UA Integration with Apache StreamPipes: A Comprehensive Guide</a></li><li class="sidebarItem__DBe"><a class="sidebarItemLink_mo7H" href="/blog/2025/04/30/usage-based-maintanence/">Usage-Based Maintenance with Apache StreamPipes</a></li><li class="sidebarItem__DBe"><a class="sidebarItemLink_mo7H" href="/blog/2025/04/28/prompt-processor/">Bringing LLM Power to Every Pipeline – The Multi-Model Prompt Processor</a></li><li class="sidebarItem__DBe"><a class="sidebarItemLink_mo7H" href="/blog/2025/04/09/javascript-processor/">How to Use the JavaScript Evaluator Processor</a></li></ul></nav></aside><main class="col col--7" itemscope="" itemtype="http://schema.org/Blog"><article itemprop="blogPost" itemscope="" itemtype="http://schema.org/BlogPosting"><header><h1 class="title_f1Hy" itemprop="headline">Anomaly Detection with StreamPipes Functions in Python and ONNX</h1><div class="container_mt6G margin-vert--md"><time datetime="2024-03-27T00:00:00.000Z" itemprop="datePublished">March 27, 2024</time> · <!-- -->6 min read</div><div class="margin-top--md margin-bottom--sm row"><div class="col col--6 authorCol_Hf19"><div class="avatar margin-bottom--sm"><a href="https://github.com/bossenti" target="_blank" rel="noopener noreferrer" class="avatar__photo-link"><img class="avatar__photo" src="/img/bossenmaier.png" alt="Tim Bossenmaier"></a><div class="avatar__intro" itemprop="author" itemscope="" itemtype="https://schema.org/Person"><div class="avatar__name"><a href="https://github.com/bossenti" target="_blank" rel="noopener noreferrer" itemprop="url"><span itemprop="name">Tim Bossenmaier</span></a></div></div></div></div></div></header><div id="__blog-post-container" class="markdown" itemprop="articleBody"><p>Apache StreamPipes saves the day when it comes to connecting to data sources in the IIoT world. Want to do more with
your IIoT data than just analyze it in a dashboard? If so, this blog post is for you! We&#x27;ll show you how to extract
historical data from StreamPipes, use it to train a machine learning model, bring the model back to StreamPipes using
ONNX, and apply the model to live data.</p><center><img loading="lazy" class="blog-image" style="max-width:75%" src="/img/blog/2024-03-26/prediction-analysis.png" alt="anomaly-detection" class="img_ev3q"><br></center><h2 class="anchor anchorWithStickyNavbar_LWe7" id="motivation">Motivation<a href="#motivation" class="hash-link" aria-label="Direct link to Motivation" title="Direct link to Motivation"></a></h2><p>With this blogpost we want to illustrate how one can easily extract historical IIoT data collected with StreamPipes,
train a machine learning model on this data and bringing the model back to StreamPipes with the interoperability
standard <a href="https://onnx.ai" target="_blank" rel="noopener noreferrer">ONNX</a> to make inference in live data.</p><p>A very common use case in the area of IIoT is the detection of anomalies, so we want to tackle this challenge in this
article as well. We will use data generated by
the <a href="https://streampipes.apache.org/docs/pe/org.apache.streampipes.connect.iiot.adapters.simulator.machine/" target="_blank" rel="noopener noreferrer">Machine Data Simulator</a>
adapter. More specifically, we will focus on the <code>flowrate</code> data, which consists of various sensor values coming from a
water pipe system. Our goal is to keep an eye on the parameter <code>volume_flow</code>, which represents the current volume flow
in
cubic meters/second. For this parameter, we want to detect anomalies that could indicate problems such as leaks,
blockages, etc.</p><p>To get the concerned data, we simply need to create an instance of the machine data simulator and persist the data in
the data lake:</p><p><img loading="lazy" src="https://raw.githubusercontent.com/apache/streampipes/dev/streampipes-client-python/docs/img/tutorial-preparation.gif" alt="tutorial-preparation" class="img_ev3q"></p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="set-up--prepare-python-client">Set Up &amp; Prepare Python Client<a href="#set-up--prepare-python-client" class="hash-link" aria-label="Direct link to Set Up &amp; Prepare Python Client" title="Direct link to Set Up &amp; Prepare Python Client"></a></h2><p>As a prerequisite, we need to install the StreamPipes Python client and all other dependencies,</p><div class="language-bash codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#bfc7d5;--prism-background-color:#292d3e"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-bash codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#bfc7d5"><span class="token plain">pip </span><span class="token function" style="color:rgb(130, 170, 255)">install</span><span class="token plain"> git+https://github.com/apache/streampipes.git</span><span class="token comment" style="color:rgb(105, 112, 152);font-style:italic">#subdirectory=streampipes-client-python</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">pip </span><span class="token function" style="color:rgb(130, 170, 255)">install</span><span class="token plain"> scikit-learn</span><span class="token operator" style="color:rgb(137, 221, 255)">==</span><span class="token number" style="color:rgb(247, 140, 108)">1.4</span><span class="token plain">.0 </span><span class="token assign-left variable" style="color:rgb(191, 199, 213)">skl2onnx</span><span class="token operator" style="color:rgb(137, 221, 255)">==</span><span class="token number" style="color:rgb(247, 140, 108)">1.16</span><span class="token plain">.0 </span><span class="token assign-left variable" style="color:rgb(191, 199, 213)">onnxruntime</span><span class="token operator" style="color:rgb(137, 221, 255)">==</span><span class="token number" style="color:rgb(247, 140, 108)">1.17</span><span class="token plain">.1</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>The next step is to configure and initialize an instance of the client.</p><div class="language-python codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#bfc7d5;--prism-background-color:#292d3e"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-python codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#bfc7d5"><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> os</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> streampipes</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">client </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> StreamPipesClient</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> streampipes</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">client</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">config </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> StreamPipesClientConfig</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> streampipes</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">client</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">credential_provider </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> StreamPipesApiKeyCredentials</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">os</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">environ</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token string" style="color:rgb(195, 232, 141)">&quot;BROKER-HOST&quot;</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token plain"> </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(195, 232, 141)">&quot;localhost&quot;</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">os</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">environ</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token string" style="color:rgb(195, 232, 141)">&quot;KAFKA-PORT&quot;</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token plain"> </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(195, 232, 141)">&quot;9094&quot;</span><span class="token plain"> </span><span class="token comment" style="color:rgb(105, 112, 152);font-style:italic"># When using Kafka as message broker</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">config </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> StreamPipesClientConfig</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> credential_provider</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain">StreamPipesApiKeyCredentials</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> username</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token string" style="color:rgb(195, 232, 141)">&quot;admin@streampipes.apache.org&quot;</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> api_key</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token string" style="color:rgb(195, 232, 141)">&quot;TOKEN&quot;</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> host_address</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token string" style="color:rgb(195, 232, 141)">&quot;localhost&quot;</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> https_disabled</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token boolean" style="color:rgb(255, 88, 116)">True</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> port</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token number" style="color:rgb(247, 140, 108)">80</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">client </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> StreamPipesClient</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">client_config</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain">config</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>In case you have never worked with the Python client before and have problems to get started,
please have a look at
our <a href="https://streampipes.apache.org/docs/docs/python/latest/tutorials/1-introduction-to-streampipes-python-client/" target="_blank" rel="noopener noreferrer">tutorial</a>.</p><p>If you already have an ONNX model and are only interested in applying it with StreamPipes on a data stream, you can skip
the following section.</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="model-training-with-historic-data">Model Training with Historic Data<a href="#model-training-with-historic-data" class="hash-link" aria-label="Direct link to Model Training with Historic Data" title="Direct link to Model Training with Historic Data"></a></h2><p>As said above, the aim of our model is to detect anomalies of the <code>volume_flow</code> parameter. For this task, we will use
<a href="https://en.wikipedia.org/wiki/Isolation_forest" target="_blank" rel="noopener noreferrer">Isolation Forests</a>. Please note that the focus of the tutorial is not
on training the model, so please be patient even though the training is very simplified and lacks important preparation
steps such as standardization.</p><p>As a first step, lets query the <code>flowrate</code> data from the StreamPipes data lake and extract the values of <code>volume_flow</code>
as a feature:</p><div class="language-python codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#bfc7d5;--prism-background-color:#292d3e"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-python codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#bfc7d5"><span class="token plain">flowrate_df </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> client</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">dataLakeMeasureApi</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">get</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token string" style="color:rgb(195, 232, 141)">&quot;flow-rate&quot;</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">to_pandas</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">X </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> flowrate_df</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token string" style="color:rgb(195, 232, 141)">&quot;volume_flow&quot;</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">values</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">reshape</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token operator" style="color:rgb(137, 221, 255)">-</span><span class="token number" style="color:rgb(247, 140, 108)">1</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> </span><span class="token number" style="color:rgb(247, 140, 108)">1</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">astype</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token string" style="color:rgb(195, 232, 141)">&quot;float32&quot;</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>As a next step, we can already train our model with the historic data:</p><div class="language-python codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#bfc7d5;--prism-background-color:#292d3e"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-python codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#bfc7d5"><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> sklearn</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">ensemble </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> IsolationForest</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">model </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> IsolationForest</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">contamination</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token number" style="color:rgb(247, 140, 108)">0.01</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">model</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">fit</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">X</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>The <code>contamination</code> parameter models the proportion of outliers in the data. See
the <a href="https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.IsolationForest.html" target="_blank" rel="noopener noreferrer">scikit-learn</a>
documentation for more information.</p><p>Here you can see how this simple model performs:</p><img loading="lazy" src="/img/blog/2024-03-26/prediction-analysis.png" class="img_ev3q"><p></p><p>This doesn&#x27;t look too bad, right? Let&#x27;s continue by converting our model to the ONNX representation.</p><div class="language-python codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#bfc7d5;--prism-background-color:#292d3e"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-python codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#bfc7d5"><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> onnxconverter_common </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> FloatTensorType</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> skl2onnx </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> to_onnx</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">model_onnx </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> to_onnx</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> model</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> initial_types</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token string" style="color:rgb(195, 232, 141)">&#x27;input&#x27;</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> FloatTensorType</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token boolean" style="color:rgb(255, 88, 116)">None</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> X</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">shape</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token number" style="color:rgb(247, 140, 108)">1</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> target_opset</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token punctuation" style="color:rgb(199, 146, 234)">{</span><span class="token string" style="color:rgb(195, 232, 141)">&#x27;ai.onnx.ml&#x27;</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> </span><span class="token number" style="color:rgb(247, 140, 108)">3</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> </span><span class="token string" style="color:rgb(195, 232, 141)">&#x27;ai.onnx&#x27;</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> </span><span class="token number" style="color:rgb(247, 140, 108)">15</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> </span><span class="token string" style="color:rgb(195, 232, 141)">&#x27;&#x27;</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> </span><span class="token number" style="color:rgb(247, 140, 108)">15</span><span class="token punctuation" style="color:rgb(199, 146, 234)">}</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">with</span><span class="token plain"> </span><span class="token builtin" style="color:rgb(130, 170, 255)">open</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token string" style="color:rgb(195, 232, 141)">&quot;isolation_forest.onnx&quot;</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> </span><span class="token string" style="color:rgb(195, 232, 141)">&quot;wb&quot;</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"> </span><span class="token keyword" style="font-style:italic">as</span><span class="token plain"> f</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> f</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">write</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">model_onnx</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">SerializeToString</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h2 class="anchor anchorWithStickyNavbar_LWe7" id="model-inference-with-live-data">Model Inference with Live Data<a href="#model-inference-with-live-data" class="hash-link" aria-label="Direct link to Model Inference with Live Data" title="Direct link to Model Inference with Live Data"></a></h2><p>Utilizing a pre-trained model within StreamPipes becomes seamless with the ONNX interoperability standard, enabling
effortless application of your existing model on live data streams.</p><p>Interacting with live data from StreamPipes is facilitated through StreamPipes functions. Below, we&#x27;ll create a Python
StreamPipes function that leverages an ONNX model to generate predictions for each incoming event, making the results
accessible as a data stream within StreamPipes for subsequent steps.</p><p>So let&#x27;s create an <code>ONNXFunction</code> that is capable of applying a model in ONNX representation to a StreamPipes data
stream.
If you&#x27;d like to read more details about how functions are defined, refer
to <a href="https://streampipes.apache.org/docs/docs/python/latest/tutorials/3-getting-live-data-from-the-streampipes-data-stream/" target="_blank" rel="noopener noreferrer">our tutorial</a>.</p><div class="language-python codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#bfc7d5;--prism-background-color:#292d3e"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-python codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#bfc7d5"><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> numpy </span><span class="token keyword" style="font-style:italic">as</span><span class="token plain"> np</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> onnxruntime </span><span class="token keyword" style="font-style:italic">as</span><span class="token plain"> rt</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> streampipes</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">functions</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">broker</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">broker_handler </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> get_broker_description</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> streampipes</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">functions</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">streampipes_function </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> StreamPipesFunction</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> streampipes</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">functions</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">utils</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">data_stream_generator </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> create_data_stream</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> RuntimeType</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> streampipes</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">functions</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">utils</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">function_context </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> FunctionContext</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> streampipes</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">model</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">resource </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> FunctionDefinition</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> DataStream</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> typing </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> Dict</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> Any</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> List</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">class</span><span class="token plain"> </span><span class="token class-name" style="color:rgb(255, 203, 107)">ONNXFunction</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">StreamPipesFunction</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token keyword" style="font-style:italic">def</span><span class="token plain"> </span><span class="token function" style="color:rgb(130, 170, 255)">__init__</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> feature_names</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> </span><span class="token builtin" style="color:rgb(130, 170, 255)">list</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token builtin" style="color:rgb(130, 170, 255)">str</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> input_stream</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> DataStream</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> output_stream </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> create_data_stream</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> name</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token string" style="color:rgb(195, 232, 141)">&quot;flowrate-prediction&quot;</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> attributes</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token punctuation" style="color:rgb(199, 146, 234)">{</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token string" style="color:rgb(195, 232, 141)">&quot;is_anomaly&quot;</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> RuntimeType</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">BOOLEAN</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">value</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">}</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> broker</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain">get_broker_description</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">input_stream</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> function_definition </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> FunctionDefinition</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> consumed_streams</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token plain">input_stream</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">element_id</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">add_output_data_stream</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">output_stream</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">feature_names </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> feature_names</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">input_name </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> </span><span class="token boolean" style="color:rgb(255, 88, 116)">None</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">output_name </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> </span><span class="token boolean" style="color:rgb(255, 88, 116)">None</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">session </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> </span><span class="token boolean" style="color:rgb(255, 88, 116)">None</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token builtin" style="color:rgb(130, 170, 255)">super</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">__init__</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">function_definition</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain">function_definition</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>First, we need to take care about the data stream that is required to send the predictions from our function to
StreamPipes. Thus, we create a dedicated output data stream which we need to provide with the attributes our event will
consist of (a timestamp attribute is always added automatically). This output data stream needs to be registered at the
function definition which is to be passed to the parent class. Lastly, we need to define some instance variables that
are mainly required for the ONNX runtime.</p><p>Next, we need to ensure that ONNX runtime session is created on start up. Thus, we need to invoke an InferenceSession
and retrieving the corresponding configuration parameters:</p><div class="language-python codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#bfc7d5;--prism-background-color:#292d3e"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-python codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#bfc7d5"><span class="token keyword" style="font-style:italic">class</span><span class="token plain"> </span><span class="token class-name" style="color:rgb(255, 203, 107)">ONNXFunction</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">StreamPipesFunction</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token keyword" style="font-style:italic">def</span><span class="token plain"> </span><span class="token function" style="color:rgb(130, 170, 255)">onServiceStarted</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> context</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> FunctionContext</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"> </span><span class="token operator" style="color:rgb(137, 221, 255)">-</span><span class="token operator" style="color:rgb(137, 221, 255)">&gt;</span><span class="token plain"> </span><span class="token boolean" style="color:rgb(255, 88, 116)">None</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">session </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> rt</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">InferenceSession</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> path_or_bytes</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token string" style="color:rgb(195, 232, 141)">&quot;isolation_forest.onnx&quot;</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> providers</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain">rt</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">get_available_providers</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">input_name </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">session</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">get_inputs</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token number" style="color:rgb(247, 140, 108)">0</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">name</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">output_name </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">session</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">get_outputs</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token number" style="color:rgb(247, 140, 108)">0</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">name</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>Lastly, we need to implement the inference logic that is applied to every event.
If you have brought up your own model, you need to adapt line <code>10-13</code>:</p><div class="language-python codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#bfc7d5;--prism-background-color:#292d3e"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-python codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv codeBlockLinesWithNumbering_o6Pm"><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token keyword" style="font-style:italic">class</span><span class="token plain"> </span><span class="token class-name" style="color:rgb(255, 203, 107)">ONNXFunction</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">StreamPipesFunction</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain" style="display:inline-block"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain" style="display:inline-block"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token keyword" style="font-style:italic">def</span><span class="token plain"> </span><span class="token function" style="color:rgb(130, 170, 255)">onEvent</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> event</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> Dict</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token builtin" style="color:rgb(130, 170, 255)">str</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> Any</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> streamId</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> </span><span class="token builtin" style="color:rgb(130, 170, 255)">str</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"> </span><span class="token operator" style="color:rgb(137, 221, 255)">-</span><span class="token operator" style="color:rgb(137, 221, 255)">&gt;</span><span class="token plain"> </span><span class="token boolean" style="color:rgb(255, 88, 116)">None</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> feature_vector </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token keyword" style="font-style:italic">for</span><span class="token plain"> feature </span><span class="token keyword" style="font-style:italic">in</span><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">feature_names</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> feature_vector</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">append</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">event</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token plain">feature</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain" style="display:inline-block"></span></span><br></span><span class="token-line theme-code-block-highlighted-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> prediction </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">session</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">run</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain"></span></span><br></span><span class="token-line theme-code-block-highlighted-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token plain">self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">output_name</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span></span><br></span><span class="token-line theme-code-block-highlighted-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">{</span><span class="token plain">self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">input_name</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> np</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">expand_dims</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">np</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">array</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">feature_vector</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> axis</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token number" style="color:rgb(247, 140, 108)">0</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">astype</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token string" style="color:rgb(195, 232, 141)">&quot;float32&quot;</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">}</span><span class="token plain"></span></span><br></span><span class="token-line theme-code-block-highlighted-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token number" style="color:rgb(247, 140, 108)">0</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain" style="display:inline-block"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> output </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">{</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token string" style="color:rgb(195, 232, 141)">&quot;is_anomaly&quot;</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> </span><span class="token builtin" style="color:rgb(130, 170, 255)">int</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">prediction</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token number" style="color:rgb(247, 140, 108)">0</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"> </span><span class="token operator" style="color:rgb(137, 221, 255)">==</span><span class="token plain"> </span><span class="token operator" style="color:rgb(137, 221, 255)">-</span><span class="token number" style="color:rgb(247, 140, 108)">1</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">}</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain" style="display:inline-block"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">add_output</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> stream_id</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain">self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">function_definition</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">get_output_stream_ids</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token number" style="color:rgb(247, 140, 108)">0</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> event</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain">output</span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain" style="display:inline-block"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token keyword" style="font-style:italic">def</span><span class="token plain"> </span><span class="token function" style="color:rgb(130, 170, 255)">onServiceStopped</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"> </span><span class="token operator" style="color:rgb(137, 221, 255)">-</span><span class="token operator" style="color:rgb(137, 221, 255)">&gt;</span><span class="token plain"> </span><span class="token boolean" style="color:rgb(255, 88, 116)">None</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token keyword" style="font-style:italic">pass</span></span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>Having the function code in place, we can start the function with the following:</p><div class="language-python codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#bfc7d5;--prism-background-color:#292d3e"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-python codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#bfc7d5"><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> streampipes</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">functions</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">registration </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> Registration</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> streampipes</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">functions</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">function_handler </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> FunctionHandler</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">stream </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> stream</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token keyword" style="font-style:italic">for</span><span class="token plain"> stream</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token keyword" style="font-style:italic">in</span><span class="token plain"> client</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">dataStreamApi</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token builtin" style="color:rgb(130, 170, 255)">all</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token keyword" style="font-style:italic">if</span><span class="token plain"> stream</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">name </span><span class="token operator" style="color:rgb(137, 221, 255)">==</span><span class="token plain"> </span><span class="token string" style="color:rgb(195, 232, 141)">&quot;flow-rate&quot;</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token number" style="color:rgb(247, 140, 108)">0</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">function </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> ONNXFunction</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> feature_names</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token string" style="color:rgb(195, 232, 141)">&quot;volume_flow&quot;</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> input_stream</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain">stream</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">registration </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> Registration</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">registration</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">register</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">function</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">function_handler </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> FunctionHandler</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">registration</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> client</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">function_handler</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">initializeFunctions</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>We can now access the live values of the prediction in the StreamPipes UI, e.g., in the pipeline editor.</p><img loading="lazy" src="/img/blog/2024-03-26/tutorial-prediction-data-stream.png" class="img_ev3q"><p></p><p>From here on you can further work with the prediction events in StreamPipes, e.g., by sending notifications
to <a href="https://streampipes.apache.org/docs/next/pe/org.apache.streampipes.sinks.notifications.jvm.msteams/" target="_blank" rel="noopener noreferrer">MS Teams</a>.</p></div></article><nav class="pagination-nav docusaurus-mt-lg" aria-label="Blog post page navigation"><a class="pagination-nav__link pagination-nav__link--prev" href="/blog/2024/06/13/release-095/"><div class="pagination-nav__sublabel">Newer Post</div><div class="pagination-nav__label">Apache StreamPipes release 0.95.0</div></a><a class="pagination-nav__link pagination-nav__link--next" href="/blog/2023/11/28/release-093/"><div class="pagination-nav__sublabel">Older Post</div><div class="pagination-nav__label">Apache StreamPipes release 0.93.0</div></a></nav></main><div class="col col--2"><div class="tableOfContents_bqdL thin-scrollbar"><ul class="table-of-contents table-of-contents__left-border"><li><a href="#motivation" class="table-of-contents__link toc-highlight">Motivation</a></li><li><a href="#set-up--prepare-python-client" class="table-of-contents__link toc-highlight">Set Up &amp; Prepare Python Client</a></li><li><a href="#model-training-with-historic-data" class="table-of-contents__link toc-highlight">Model Training with Historic Data</a></li><li><a href="#model-inference-with-live-data" class="table-of-contents__link toc-highlight">Model Inference with Live Data</a></li></ul></div></div></div></div></div><footer class="container_yWIM"><div class="linksRow_XFIr"><div class="linksCol_rFJL"><div>ASF</div><ul><li class="footer__item"><a href="https://www.apache.org/" target="_blank" rel="noopener noreferrer"><span></span><span>Foundation</span></a></li><li class="footer__item"><a href="https://www.apache.org/licenses/" target="_blank" rel="noopener noreferrer"><span></span><span>License</span></a></li><li class="footer__item"><a href="https://www.apache.org/events/" target="_blank" rel="noopener noreferrer"><span></span><span>Events</span></a></li><li class="footer__item"><a href="https://www.apache.org/security/" target="_blank" rel="noopener noreferrer"><span></span><span>Security</span></a></li><li class="footer__item"><a href="https://www.apache.org/foundation/sponsorship.html" target="_blank" rel="noopener noreferrer"><span></span><span>Sponsorship</span></a></li><li class="footer__item"><a href="https://www.apache.org/foundation/thanks.html" target="_blank" rel="noopener noreferrer"><span></span><span>Thanks</span></a></li></ul></div><div class="linksCol_rFJL"><div>Community</div><ul><li class="footer__item"><a href="https://github.com/apache/streampipes/issues" target="_blank" rel="noopener noreferrer"><span></span><span>GitHub Issues</span></a></li><li class="footer__item"><a href="https://github.com/apache/streampipes/discussions" target="_blank" rel="noopener noreferrer"><span></span><span>Github Discussions</span></a></li><li class="footer__item"><a href="https://twitter.com/StreamPipes" target="_blank" rel="noopener noreferrer"><span></span><span>Twitter</span></a></li><li class="footer__item"><a href="https://www.linkedin.com/company/apache-streampipes" target="_blank" rel="noopener noreferrer"><span></span><span>LinkedIn</span></a></li></ul></div><div class="linksCol_rFJL"><div>More</div><ul><li class="footer__item"><a target="_parent" href="/blog/"><span></span><span>Blog</span></a></li><li class="footer__item"><a href="https://privacy.apache.org/policies/privacy-policy-public.html" target="_parent" rel="noopener noreferrer"><span></span><span>Privacy</span></a></li></ul></div></div><div class="copyright_AoBa"><a href="https://www.apache.org/" target="_blank" rel="noopener noreferrer"><span style="display:inline-block;width:231.25px;height:40px"></span></a><div>Copyright © 2019-2025 The Apache Software Foundation. Apache StreamPipes, Apache, the Apache feather logo, and the Apache StreamPipes project logo are either registered trademarks or trademarks of the Apache Software Foundation.</div></div></footer></div>
<script src="/assets/js/runtime~main.d38cef4e.js"></script>
<script src="/assets/js/main.6ee83347.js"></script>
</body>
</html>