| <!doctype html> |
| <html lang="en" dir="ltr" class="blog-wrapper blog-post-page plugin-blog plugin-id-default"> |
| <head> |
| <meta charset="UTF-8"> |
| <meta name="generator" content="Docusaurus v2.4.1"> |
| <title data-rh="true">Anomaly Detection with StreamPipes Functions in Python and ONNX | Apache StreamPipes</title><meta data-rh="true" name="viewport" content="width=device-width,initial-scale=1"><meta data-rh="true" name="twitter:card" content="summary_large_image"><meta data-rh="true" property="og:image" content="https://streampipes.apache.org/img/favicon.png"><meta data-rh="true" name="twitter:image" content="https://streampipes.apache.org/img/favicon.png"><meta data-rh="true" property="og:url" content="https://streampipes.apache.org/blog/2024/03/27/anomaly-detection-with-python-functions/"><meta data-rh="true" name="docusaurus_locale" content="en"><meta data-rh="true" name="docusaurus_tag" content="default"><meta data-rh="true" name="docsearch:language" content="en"><meta data-rh="true" name="docsearch:docusaurus_tag" content="default"><meta data-rh="true" property="og:title" content="Anomaly Detection with StreamPipes Functions in Python and ONNX | Apache StreamPipes"><meta data-rh="true" name="description" content="Apache StreamPipes saves the day when it comes to connecting to data sources in the IIoT world. Want to do more with"><meta data-rh="true" property="og:description" content="Apache StreamPipes saves the day when it comes to connecting to data sources in the IIoT world. Want to do more with"><meta data-rh="true" property="og:type" content="article"><meta data-rh="true" property="article:published_time" content="2024-03-27T00:00:00.000Z"><meta data-rh="true" property="article:author" content="https://github.com/bossenti"><link data-rh="true" rel="icon" href="/img/favicon.png"><link data-rh="true" rel="canonical" href="https://streampipes.apache.org/blog/2024/03/27/anomaly-detection-with-python-functions/"><link data-rh="true" rel="alternate" href="https://streampipes.apache.org/blog/2024/03/27/anomaly-detection-with-python-functions/" hreflang="en"><link data-rh="true" rel="alternate" href="https://streampipes.apache.org/blog/2024/03/27/anomaly-detection-with-python-functions/" hreflang="x-default"><link rel="alternate" type="application/rss+xml" href="/blog/rss.xml" title="Apache StreamPipes RSS Feed"> |
| <link rel="alternate" type="application/atom+xml" href="/blog/atom.xml" title="Apache StreamPipes Atom Feed"> |
| |
| |
| |
| <link rel="preconnect" href="//analytics.apache.org/"> |
| <script>var _paq=window._paq=window._paq||[];_paq.push(["disableCookies"]),_paq.push(["trackPageView"]),_paq.push(["enableLinkTracking"]),function(){var a="//analytics.apache.org/";_paq.push(["setTrackerUrl",a+"matomo.php"]),_paq.push(["setSiteId","35"]);var e=document,p=e.createElement("script"),t=e.getElementsByTagName("script")[0];p.type="text/javascript",p.async=!0,p.src=a+"matomo.js",t.parentNode.insertBefore(p,t)}()</script> |
| |
| |
| <link rel="stylesheet" href="/css/slick.min.css"> |
| <link rel="stylesheet" href="/css/slick-theme.min.css"> |
| <link rel="stylesheet" href="/css/fonts.css"> |
| <link rel="stylesheet" href="/css/admonition.css"> |
| <link rel="stylesheet" href="/css/custom.css"> |
| <link rel="stylesheet" href="/css/code-block-buttons.css"> |
| <link rel="stylesheet" href="/css/all.min.css"> |
| <link rel="stylesheet" href="/css/bootstrap.min.css"> |
| <link rel="stylesheet" href="/css/buttons.css"> |
| <link rel="stylesheet" href="/css/team.css"> |
| <link rel="stylesheet" href="/css/custom-website.css"> |
| <link rel="stylesheet" href="/css/page-banner.css"> |
| <link rel="stylesheet" href="/css/responsive.css"> |
| <script src="/js/jquery-3.3.1.min.js"></script> |
| <script src="/js/buttons.js"></script> |
| <script src="/js/slick.min.js"></script> |
| <script src="/js/custom.js"></script> |
| <script src="/js/clipboard.min.js"></script> |
| <script src="/js/code-block-buttons.js"></script><link rel="stylesheet" href="/assets/css/styles.a4107cef.css"> |
| <link rel="preload" href="/assets/js/runtime~main.d38cef4e.js" as="script"> |
| <link rel="preload" href="/assets/js/main.6ee83347.js" as="script"> |
| </head> |
| <body class="navigation-with-keyboard"> |
| <script>!function(){function t(t){document.documentElement.setAttribute("data-theme",t)}var e=function(){var t=null;try{t=new URLSearchParams(window.location.search).get("docusaurus-theme")}catch(t){}return t}()||function(){var t=null;try{t=localStorage.getItem("theme")}catch(t){}return t}();t(null!==e?e:"light")}(),document.documentElement.setAttribute("data-announcement-bar-initially-dismissed",function(){try{return"true"===localStorage.getItem("docusaurus.announcement.dismiss")}catch(t){}return!1}())</script><div id="__docusaurus"> |
| <div role="region" aria-label="Skip to main content"><a class="skipToContent_fXgn" href="#__docusaurus_skipToContent_fallback">Skip to main content</a></div><div class="announcementBar_mb4j" style="background-color:var(--color-accent);color:white" role="banner"><div class="content_knG7 announcementBarContent_xLdY">Apache StreamPipes 0.97.0 is available! ⭐️</div></div><nav aria-label="Main" class="navbar navbar--fixed-top"><div class="navbar__inner"><div class="navbar__items"><button aria-label="Toggle navigation bar" aria-expanded="false" class="navbar__toggle clean-btn" type="button"><svg width="30" height="30" viewBox="0 0 30 30" aria-hidden="true"><path stroke="currentColor" stroke-linecap="round" stroke-miterlimit="10" stroke-width="2" d="M4 7h22M4 15h22M4 23h22"></path></svg></button><a class="navbar__brand" href="/"><div class="navbar__logo"><img src="/img/sp-logo-color.png" alt="Apache StreamPipes" class="themedImage_ToTc themedImage--light_HNdA"><img src="/img/sp-logo-color.png" alt="Apache StreamPipes" class="themedImage_ToTc themedImage--dark_i4oU"></div></a></div><div class="navbar__items navbar__items--right"><a aria-current="page" class="navbar__item navbar__link navbar__link--active" href="/">Home</a><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">Docs</a><ul class="dropdown__menu"><li><a class="dropdown__link" href="/docs/user-guide-introduction/">Apache StreamPipes - User Guide</a></li><li><a href="https://streampipes.apache.org/docs/docs/python/latest/" target="_blank" rel="noopener noreferrer" class="dropdown__link">Apache StreamPipes - Python</a></li><li><a class="dropdown__link" href="/docs/faq-common-problems/">FAQ</a></li></ul></div><a class="navbar__item navbar__link" href="/download/">Download</a><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">Resources</a><ul class="dropdown__menu"><li><a href="https://github.com/apache/streampipes/issues" target="_blank" rel="noopener noreferrer" class="dropdown__link">Issue Tracker<svg width="12" height="12" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li><a href="https://github.com/apache/streampipes/discussions" target="_blank" rel="noopener noreferrer" class="dropdown__link">Github Discussions Support<svg width="12" height="12" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li><a href="http://cwiki.apache.org/confluence/display/STREAMPIPES" target="_blank" rel="noopener noreferrer" class="dropdown__link">Developer wiki<svg width="12" height="12" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li><a class="dropdown__link" href="/resources/slides-videos/">Slides & Videos</a></li></ul></div><a aria-current="page" class="navbar__item navbar__link navbar__link--active" href="/blog/">Blog</a><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">Community</a><ul class="dropdown__menu"><li><a class="dropdown__link" href="/community/mailing-lists/">Mailing Lists</a></li><li><a class="dropdown__link" href="/community/team/">Team</a></li><li><a class="dropdown__link" href="/community/get-involved/">Get involved</a></li><li><a class="dropdown__link" href="/community/talks-events/">Talks & Events</a></li></ul></div><div class="navbar__item dropdown dropdown--hoverable dropdown--right"><a href="#" aria-haspopup="true" aria-expanded="false" role="button" class="navbar__link">Apache</a><ul class="dropdown__menu"><li><a href="https://www.apache.org/" target="_blank" rel="noopener noreferrer" class="dropdown__link">Apache Software Foundation<svg width="12" height="12" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li><a href="https://www.apache.org/foundation/how-it-works.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">How Apache Works<svg width="12" height="12" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li><a href="https://www.apache.org/licenses" target="_blank" rel="noopener noreferrer" class="dropdown__link">License<svg width="12" height="12" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li><a href="https://www.apache.org/security" target="_blank" rel="noopener noreferrer" class="dropdown__link">Security<svg width="12" height="12" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li><a href="https://www.apache.org/foundation/sponsorship.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">Sponsoring Apache<svg width="12" height="12" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li><li><a href="https://www.apache.org/foundation/thanks.html" target="_blank" rel="noopener noreferrer" class="dropdown__link">Thanks<svg width="12" height="12" aria-hidden="true" viewBox="0 0 24 24" class="iconExternalLink_nPIU"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg></a></li></ul></div><a href="https://github.com/apache/streampipes" target="_blank" rel="noopener noreferrer" class="navbar__item navbar__link header-link h-github" aria-label="Apache StreamPipes Github"></a><a href="https://linkedin.com/company/apache-streampipes" target="_blank" rel="noopener noreferrer" class="navbar__item navbar__link header-link h-linkedin" aria-label="Apache StreamPipes LinkedIn"></a><a href="https://twitter.com/streampipes" target="_blank" rel="noopener noreferrer" class="navbar__item navbar__link header-link h-twitter" aria-label="Apache StreamPipes Twitter"></a><div class="searchBox_ZlJk"></div></div></div><div role="presentation" class="navbar-sidebar__backdrop"></div></nav><div id="__docusaurus_skipToContent_fallback" class="main-wrapper mainWrapper_z2l0"><div class="container margin-vert--lg"><div class="row"><aside class="col col--3"><nav class="sidebar_re4s thin-scrollbar" aria-label="Blog recent posts navigation"><div class="sidebarItemTitle_pO2u margin-bottom--md">Recent posts</div><ul class="sidebarItemList_Yudw clean-list"><li class="sidebarItem__DBe"><a class="sidebarItemLink_mo7H" href="/blog/2025/06/11/oi4-adapter/">From Sensor to Insight - Rapid IIoT Integration for IO-Link Devices with Apache StreamPipes</a></li><li class="sidebarItem__DBe"><a class="sidebarItemLink_mo7H" href="/blog/2025/05/20/opc-ua-certificates/">Secure OPC-UA Integration with Apache StreamPipes: A Comprehensive Guide</a></li><li class="sidebarItem__DBe"><a class="sidebarItemLink_mo7H" href="/blog/2025/04/30/usage-based-maintanence/">Usage-Based Maintenance with Apache StreamPipes</a></li><li class="sidebarItem__DBe"><a class="sidebarItemLink_mo7H" href="/blog/2025/04/28/prompt-processor/">Bringing LLM Power to Every Pipeline – The Multi-Model Prompt Processor</a></li><li class="sidebarItem__DBe"><a class="sidebarItemLink_mo7H" href="/blog/2025/04/09/javascript-processor/">How to Use the JavaScript Evaluator Processor</a></li></ul></nav></aside><main class="col col--7" itemscope="" itemtype="http://schema.org/Blog"><article itemprop="blogPost" itemscope="" itemtype="http://schema.org/BlogPosting"><header><h1 class="title_f1Hy" itemprop="headline">Anomaly Detection with StreamPipes Functions in Python and ONNX</h1><div class="container_mt6G margin-vert--md"><time datetime="2024-03-27T00:00:00.000Z" itemprop="datePublished">March 27, 2024</time> · <!-- -->6 min read</div><div class="margin-top--md margin-bottom--sm row"><div class="col col--6 authorCol_Hf19"><div class="avatar margin-bottom--sm"><a href="https://github.com/bossenti" target="_blank" rel="noopener noreferrer" class="avatar__photo-link"><img class="avatar__photo" src="/img/bossenmaier.png" alt="Tim Bossenmaier"></a><div class="avatar__intro" itemprop="author" itemscope="" itemtype="https://schema.org/Person"><div class="avatar__name"><a href="https://github.com/bossenti" target="_blank" rel="noopener noreferrer" itemprop="url"><span itemprop="name">Tim Bossenmaier</span></a></div></div></div></div></div></header><div id="__blog-post-container" class="markdown" itemprop="articleBody"><p>Apache StreamPipes saves the day when it comes to connecting to data sources in the IIoT world. Want to do more with |
| your IIoT data than just analyze it in a dashboard? If so, this blog post is for you! We'll show you how to extract |
| historical data from StreamPipes, use it to train a machine learning model, bring the model back to StreamPipes using |
| ONNX, and apply the model to live data.</p><center><img loading="lazy" class="blog-image" style="max-width:75%" src="/img/blog/2024-03-26/prediction-analysis.png" alt="anomaly-detection" class="img_ev3q"><br></center><h2 class="anchor anchorWithStickyNavbar_LWe7" id="motivation">Motivation<a href="#motivation" class="hash-link" aria-label="Direct link to Motivation" title="Direct link to Motivation"></a></h2><p>With this blogpost we want to illustrate how one can easily extract historical IIoT data collected with StreamPipes, |
| train a machine learning model on this data and bringing the model back to StreamPipes with the interoperability |
| standard <a href="https://onnx.ai" target="_blank" rel="noopener noreferrer">ONNX</a> to make inference in live data.</p><p>A very common use case in the area of IIoT is the detection of anomalies, so we want to tackle this challenge in this |
| article as well. We will use data generated by |
| the <a href="https://streampipes.apache.org/docs/pe/org.apache.streampipes.connect.iiot.adapters.simulator.machine/" target="_blank" rel="noopener noreferrer">Machine Data Simulator</a> |
| adapter. More specifically, we will focus on the <code>flowrate</code> data, which consists of various sensor values coming from a |
| water pipe system. Our goal is to keep an eye on the parameter <code>volume_flow</code>, which represents the current volume flow |
| in |
| cubic meters/second. For this parameter, we want to detect anomalies that could indicate problems such as leaks, |
| blockages, etc.</p><p>To get the concerned data, we simply need to create an instance of the machine data simulator and persist the data in |
| the data lake:</p><p><img loading="lazy" src="https://raw.githubusercontent.com/apache/streampipes/dev/streampipes-client-python/docs/img/tutorial-preparation.gif" alt="tutorial-preparation" class="img_ev3q"></p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="set-up--prepare-python-client">Set Up & Prepare Python Client<a href="#set-up--prepare-python-client" class="hash-link" aria-label="Direct link to Set Up & Prepare Python Client" title="Direct link to Set Up & Prepare Python Client"></a></h2><p>As a prerequisite, we need to install the StreamPipes Python client and all other dependencies,</p><div class="language-bash codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#bfc7d5;--prism-background-color:#292d3e"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-bash codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#bfc7d5"><span class="token plain">pip </span><span class="token function" style="color:rgb(130, 170, 255)">install</span><span class="token plain"> git+https://github.com/apache/streampipes.git</span><span class="token comment" style="color:rgb(105, 112, 152);font-style:italic">#subdirectory=streampipes-client-python</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">pip </span><span class="token function" style="color:rgb(130, 170, 255)">install</span><span class="token plain"> scikit-learn</span><span class="token operator" style="color:rgb(137, 221, 255)">==</span><span class="token number" style="color:rgb(247, 140, 108)">1.4</span><span class="token plain">.0 </span><span class="token assign-left variable" style="color:rgb(191, 199, 213)">skl2onnx</span><span class="token operator" style="color:rgb(137, 221, 255)">==</span><span class="token number" style="color:rgb(247, 140, 108)">1.16</span><span class="token plain">.0 </span><span class="token assign-left variable" style="color:rgb(191, 199, 213)">onnxruntime</span><span class="token operator" style="color:rgb(137, 221, 255)">==</span><span class="token number" style="color:rgb(247, 140, 108)">1.17</span><span class="token plain">.1</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>The next step is to configure and initialize an instance of the client.</p><div class="language-python codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#bfc7d5;--prism-background-color:#292d3e"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-python codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#bfc7d5"><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> os</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> streampipes</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">client </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> StreamPipesClient</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> streampipes</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">client</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">config </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> StreamPipesClientConfig</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> streampipes</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">client</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">credential_provider </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> StreamPipesApiKeyCredentials</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">os</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">environ</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token string" style="color:rgb(195, 232, 141)">"BROKER-HOST"</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token plain"> </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(195, 232, 141)">"localhost"</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">os</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">environ</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token string" style="color:rgb(195, 232, 141)">"KAFKA-PORT"</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token plain"> </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> </span><span class="token string" style="color:rgb(195, 232, 141)">"9094"</span><span class="token plain"> </span><span class="token comment" style="color:rgb(105, 112, 152);font-style:italic"># When using Kafka as message broker</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">config </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> StreamPipesClientConfig</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> credential_provider</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain">StreamPipesApiKeyCredentials</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> username</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token string" style="color:rgb(195, 232, 141)">"admin@streampipes.apache.org"</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> api_key</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token string" style="color:rgb(195, 232, 141)">"TOKEN"</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> host_address</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token string" style="color:rgb(195, 232, 141)">"localhost"</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> https_disabled</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token boolean" style="color:rgb(255, 88, 116)">True</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> port</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token number" style="color:rgb(247, 140, 108)">80</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">client </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> StreamPipesClient</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">client_config</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain">config</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>In case you have never worked with the Python client before and have problems to get started, |
| please have a look at |
| our <a href="https://streampipes.apache.org/docs/docs/python/latest/tutorials/1-introduction-to-streampipes-python-client/" target="_blank" rel="noopener noreferrer">tutorial</a>.</p><p>If you already have an ONNX model and are only interested in applying it with StreamPipes on a data stream, you can skip |
| the following section.</p><h2 class="anchor anchorWithStickyNavbar_LWe7" id="model-training-with-historic-data">Model Training with Historic Data<a href="#model-training-with-historic-data" class="hash-link" aria-label="Direct link to Model Training with Historic Data" title="Direct link to Model Training with Historic Data"></a></h2><p>As said above, the aim of our model is to detect anomalies of the <code>volume_flow</code> parameter. For this task, we will use |
| <a href="https://en.wikipedia.org/wiki/Isolation_forest" target="_blank" rel="noopener noreferrer">Isolation Forests</a>. Please note that the focus of the tutorial is not |
| on training the model, so please be patient even though the training is very simplified and lacks important preparation |
| steps such as standardization.</p><p>As a first step, lets query the <code>flowrate</code> data from the StreamPipes data lake and extract the values of <code>volume_flow</code> |
| as a feature:</p><div class="language-python codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#bfc7d5;--prism-background-color:#292d3e"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-python codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#bfc7d5"><span class="token plain">flowrate_df </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> client</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">dataLakeMeasureApi</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">get</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token string" style="color:rgb(195, 232, 141)">"flow-rate"</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">to_pandas</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">X </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> flowrate_df</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token string" style="color:rgb(195, 232, 141)">"volume_flow"</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">values</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">reshape</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token operator" style="color:rgb(137, 221, 255)">-</span><span class="token number" style="color:rgb(247, 140, 108)">1</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> </span><span class="token number" style="color:rgb(247, 140, 108)">1</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">astype</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token string" style="color:rgb(195, 232, 141)">"float32"</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>As a next step, we can already train our model with the historic data:</p><div class="language-python codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#bfc7d5;--prism-background-color:#292d3e"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-python codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#bfc7d5"><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> sklearn</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">ensemble </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> IsolationForest</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">model </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> IsolationForest</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">contamination</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token number" style="color:rgb(247, 140, 108)">0.01</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">model</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">fit</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">X</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>The <code>contamination</code> parameter models the proportion of outliers in the data. See |
| the <a href="https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.IsolationForest.html" target="_blank" rel="noopener noreferrer">scikit-learn</a> |
| documentation for more information.</p><p>Here you can see how this simple model performs:</p><img loading="lazy" src="/img/blog/2024-03-26/prediction-analysis.png" class="img_ev3q"><p></p><p>This doesn't look too bad, right? Let's continue by converting our model to the ONNX representation.</p><div class="language-python codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#bfc7d5;--prism-background-color:#292d3e"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-python codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#bfc7d5"><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> onnxconverter_common </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> FloatTensorType</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> skl2onnx </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> to_onnx</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">model_onnx </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> to_onnx</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> model</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> initial_types</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token string" style="color:rgb(195, 232, 141)">'input'</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> FloatTensorType</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token boolean" style="color:rgb(255, 88, 116)">None</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> X</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">shape</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token number" style="color:rgb(247, 140, 108)">1</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> target_opset</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token punctuation" style="color:rgb(199, 146, 234)">{</span><span class="token string" style="color:rgb(195, 232, 141)">'ai.onnx.ml'</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> </span><span class="token number" style="color:rgb(247, 140, 108)">3</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> </span><span class="token string" style="color:rgb(195, 232, 141)">'ai.onnx'</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> </span><span class="token number" style="color:rgb(247, 140, 108)">15</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> </span><span class="token string" style="color:rgb(195, 232, 141)">''</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> </span><span class="token number" style="color:rgb(247, 140, 108)">15</span><span class="token punctuation" style="color:rgb(199, 146, 234)">}</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">with</span><span class="token plain"> </span><span class="token builtin" style="color:rgb(130, 170, 255)">open</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token string" style="color:rgb(195, 232, 141)">"isolation_forest.onnx"</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> </span><span class="token string" style="color:rgb(195, 232, 141)">"wb"</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"> </span><span class="token keyword" style="font-style:italic">as</span><span class="token plain"> f</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> f</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">write</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">model_onnx</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">SerializeToString</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><h2 class="anchor anchorWithStickyNavbar_LWe7" id="model-inference-with-live-data">Model Inference with Live Data<a href="#model-inference-with-live-data" class="hash-link" aria-label="Direct link to Model Inference with Live Data" title="Direct link to Model Inference with Live Data"></a></h2><p>Utilizing a pre-trained model within StreamPipes becomes seamless with the ONNX interoperability standard, enabling |
| effortless application of your existing model on live data streams.</p><p>Interacting with live data from StreamPipes is facilitated through StreamPipes functions. Below, we'll create a Python |
| StreamPipes function that leverages an ONNX model to generate predictions for each incoming event, making the results |
| accessible as a data stream within StreamPipes for subsequent steps.</p><p>So let's create an <code>ONNXFunction</code> that is capable of applying a model in ONNX representation to a StreamPipes data |
| stream. |
| If you'd like to read more details about how functions are defined, refer |
| to <a href="https://streampipes.apache.org/docs/docs/python/latest/tutorials/3-getting-live-data-from-the-streampipes-data-stream/" target="_blank" rel="noopener noreferrer">our tutorial</a>.</p><div class="language-python codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#bfc7d5;--prism-background-color:#292d3e"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-python codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#bfc7d5"><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> numpy </span><span class="token keyword" style="font-style:italic">as</span><span class="token plain"> np</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> onnxruntime </span><span class="token keyword" style="font-style:italic">as</span><span class="token plain"> rt</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> streampipes</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">functions</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">broker</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">broker_handler </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> get_broker_description</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> streampipes</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">functions</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">streampipes_function </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> StreamPipesFunction</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> streampipes</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">functions</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">utils</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">data_stream_generator </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> create_data_stream</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> RuntimeType</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> streampipes</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">functions</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">utils</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">function_context </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> FunctionContext</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> streampipes</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">model</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">resource </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> FunctionDefinition</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> DataStream</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> typing </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> Dict</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> Any</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> List</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">class</span><span class="token plain"> </span><span class="token class-name" style="color:rgb(255, 203, 107)">ONNXFunction</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">StreamPipesFunction</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token keyword" style="font-style:italic">def</span><span class="token plain"> </span><span class="token function" style="color:rgb(130, 170, 255)">__init__</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> feature_names</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> </span><span class="token builtin" style="color:rgb(130, 170, 255)">list</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token builtin" style="color:rgb(130, 170, 255)">str</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> input_stream</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> DataStream</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> output_stream </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> create_data_stream</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> name</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token string" style="color:rgb(195, 232, 141)">"flowrate-prediction"</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> attributes</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token punctuation" style="color:rgb(199, 146, 234)">{</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token string" style="color:rgb(195, 232, 141)">"is_anomaly"</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> RuntimeType</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">BOOLEAN</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">value</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">}</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> broker</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain">get_broker_description</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">input_stream</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> function_definition </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> FunctionDefinition</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> consumed_streams</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token plain">input_stream</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">element_id</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">add_output_data_stream</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">output_stream</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">feature_names </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> feature_names</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">input_name </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> </span><span class="token boolean" style="color:rgb(255, 88, 116)">None</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">output_name </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> </span><span class="token boolean" style="color:rgb(255, 88, 116)">None</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">session </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> </span><span class="token boolean" style="color:rgb(255, 88, 116)">None</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token builtin" style="color:rgb(130, 170, 255)">super</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">__init__</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">function_definition</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain">function_definition</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>First, we need to take care about the data stream that is required to send the predictions from our function to |
| StreamPipes. Thus, we create a dedicated output data stream which we need to provide with the attributes our event will |
| consist of (a timestamp attribute is always added automatically). This output data stream needs to be registered at the |
| function definition which is to be passed to the parent class. Lastly, we need to define some instance variables that |
| are mainly required for the ONNX runtime.</p><p>Next, we need to ensure that ONNX runtime session is created on start up. Thus, we need to invoke an InferenceSession |
| and retrieving the corresponding configuration parameters:</p><div class="language-python codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#bfc7d5;--prism-background-color:#292d3e"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-python codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#bfc7d5"><span class="token keyword" style="font-style:italic">class</span><span class="token plain"> </span><span class="token class-name" style="color:rgb(255, 203, 107)">ONNXFunction</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">StreamPipesFunction</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token keyword" style="font-style:italic">def</span><span class="token plain"> </span><span class="token function" style="color:rgb(130, 170, 255)">onServiceStarted</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> context</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> FunctionContext</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"> </span><span class="token operator" style="color:rgb(137, 221, 255)">-</span><span class="token operator" style="color:rgb(137, 221, 255)">></span><span class="token plain"> </span><span class="token boolean" style="color:rgb(255, 88, 116)">None</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">session </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> rt</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">InferenceSession</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> path_or_bytes</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token string" style="color:rgb(195, 232, 141)">"isolation_forest.onnx"</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> providers</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain">rt</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">get_available_providers</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">input_name </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">session</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">get_inputs</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token number" style="color:rgb(247, 140, 108)">0</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">name</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">output_name </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">session</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">get_outputs</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token number" style="color:rgb(247, 140, 108)">0</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">name</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>Lastly, we need to implement the inference logic that is applied to every event. |
| If you have brought up your own model, you need to adapt line <code>10-13</code>:</p><div class="language-python codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#bfc7d5;--prism-background-color:#292d3e"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-python codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv codeBlockLinesWithNumbering_o6Pm"><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token keyword" style="font-style:italic">class</span><span class="token plain"> </span><span class="token class-name" style="color:rgb(255, 203, 107)">ONNXFunction</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">StreamPipesFunction</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain" style="display:inline-block"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain" style="display:inline-block"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token keyword" style="font-style:italic">def</span><span class="token plain"> </span><span class="token function" style="color:rgb(130, 170, 255)">onEvent</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> event</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> Dict</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token builtin" style="color:rgb(130, 170, 255)">str</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> Any</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> streamId</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> </span><span class="token builtin" style="color:rgb(130, 170, 255)">str</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"> </span><span class="token operator" style="color:rgb(137, 221, 255)">-</span><span class="token operator" style="color:rgb(137, 221, 255)">></span><span class="token plain"> </span><span class="token boolean" style="color:rgb(255, 88, 116)">None</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> feature_vector </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token keyword" style="font-style:italic">for</span><span class="token plain"> feature </span><span class="token keyword" style="font-style:italic">in</span><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">feature_names</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> feature_vector</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">append</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">event</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token plain">feature</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain" style="display:inline-block"></span></span><br></span><span class="token-line theme-code-block-highlighted-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> prediction </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">session</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">run</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain"></span></span><br></span><span class="token-line theme-code-block-highlighted-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token plain">self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">output_name</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span></span><br></span><span class="token-line theme-code-block-highlighted-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">{</span><span class="token plain">self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">input_name</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> np</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">expand_dims</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">np</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">array</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">feature_vector</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> axis</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token number" style="color:rgb(247, 140, 108)">0</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">astype</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token string" style="color:rgb(195, 232, 141)">"float32"</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">}</span><span class="token plain"></span></span><br></span><span class="token-line theme-code-block-highlighted-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token number" style="color:rgb(247, 140, 108)">0</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain" style="display:inline-block"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> output </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">{</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token string" style="color:rgb(195, 232, 141)">"is_anomaly"</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"> </span><span class="token builtin" style="color:rgb(130, 170, 255)">int</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">prediction</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token number" style="color:rgb(247, 140, 108)">0</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"> </span><span class="token operator" style="color:rgb(137, 221, 255)">==</span><span class="token plain"> </span><span class="token operator" style="color:rgb(137, 221, 255)">-</span><span class="token number" style="color:rgb(247, 140, 108)">1</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">}</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain" style="display:inline-block"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">add_output</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> stream_id</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain">self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">function_definition</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">get_output_stream_ids</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token number" style="color:rgb(247, 140, 108)">0</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> event</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain">output</span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain" style="display:inline-block"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token keyword" style="font-style:italic">def</span><span class="token plain"> </span><span class="token function" style="color:rgb(130, 170, 255)">onServiceStopped</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">self</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"> </span><span class="token operator" style="color:rgb(137, 221, 255)">-</span><span class="token operator" style="color:rgb(137, 221, 255)">></span><span class="token plain"> </span><span class="token boolean" style="color:rgb(255, 88, 116)">None</span><span class="token punctuation" style="color:rgb(199, 146, 234)">:</span><span class="token plain"></span></span><br></span><span class="token-line codeLine_lJS_" style="color:#bfc7d5"><span class="codeLineNumber_Tfdd"></span><span class="codeLineContent_feaV"><span class="token plain"> </span><span class="token keyword" style="font-style:italic">pass</span></span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>Having the function code in place, we can start the function with the following:</p><div class="language-python codeBlockContainer_Ckt0 theme-code-block" style="--prism-color:#bfc7d5;--prism-background-color:#292d3e"><div class="codeBlockContent_biex"><pre tabindex="0" class="prism-code language-python codeBlock_bY9V thin-scrollbar"><code class="codeBlockLines_e6Vv"><span class="token-line" style="color:#bfc7d5"><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> streampipes</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">functions</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">registration </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> Registration</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token keyword" style="font-style:italic">from</span><span class="token plain"> streampipes</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">functions</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">function_handler </span><span class="token keyword" style="font-style:italic">import</span><span class="token plain"> FunctionHandler</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">stream </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> </span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> stream</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token keyword" style="font-style:italic">for</span><span class="token plain"> stream</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token keyword" style="font-style:italic">in</span><span class="token plain"> client</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">dataStreamApi</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token builtin" style="color:rgb(130, 170, 255)">all</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> </span><span class="token keyword" style="font-style:italic">if</span><span class="token plain"> stream</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">name </span><span class="token operator" style="color:rgb(137, 221, 255)">==</span><span class="token plain"> </span><span class="token string" style="color:rgb(195, 232, 141)">"flow-rate"</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token number" style="color:rgb(247, 140, 108)">0</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">function </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> ONNXFunction</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> feature_names</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token punctuation" style="color:rgb(199, 146, 234)">[</span><span class="token string" style="color:rgb(195, 232, 141)">"volume_flow"</span><span class="token punctuation" style="color:rgb(199, 146, 234)">]</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"> input_stream</span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain">stream</span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain"></span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain" style="display:inline-block"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">registration </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> Registration</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">registration</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">register</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">function</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">function_handler </span><span class="token operator" style="color:rgb(137, 221, 255)">=</span><span class="token plain"> FunctionHandler</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token plain">registration</span><span class="token punctuation" style="color:rgb(199, 146, 234)">,</span><span class="token plain"> client</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><span class="token plain"></span><br></span><span class="token-line" style="color:#bfc7d5"><span class="token plain">function_handler</span><span class="token punctuation" style="color:rgb(199, 146, 234)">.</span><span class="token plain">initializeFunctions</span><span class="token punctuation" style="color:rgb(199, 146, 234)">(</span><span class="token punctuation" style="color:rgb(199, 146, 234)">)</span><br></span></code></pre><div class="buttonGroup__atx"><button type="button" aria-label="Copy code to clipboard" title="Copy" class="clean-btn"><span class="copyButtonIcons_eSgA" aria-hidden="true"><svg viewBox="0 0 24 24" class="copyButtonIcon_y97N"><path fill="currentColor" d="M19,21H8V7H19M19,5H8A2,2 0 0,0 6,7V21A2,2 0 0,0 8,23H19A2,2 0 0,0 21,21V7A2,2 0 0,0 19,5M16,1H4A2,2 0 0,0 2,3V17H4V3H16V1Z"></path></svg><svg viewBox="0 0 24 24" class="copyButtonSuccessIcon_LjdS"><path fill="currentColor" d="M21,7L9,19L3.5,13.5L4.91,12.09L9,16.17L19.59,5.59L21,7Z"></path></svg></span></button></div></div></div><p>We can now access the live values of the prediction in the StreamPipes UI, e.g., in the pipeline editor.</p><img loading="lazy" src="/img/blog/2024-03-26/tutorial-prediction-data-stream.png" class="img_ev3q"><p></p><p>From here on you can further work with the prediction events in StreamPipes, e.g., by sending notifications |
| to <a href="https://streampipes.apache.org/docs/next/pe/org.apache.streampipes.sinks.notifications.jvm.msteams/" target="_blank" rel="noopener noreferrer">MS Teams</a>.</p></div></article><nav class="pagination-nav docusaurus-mt-lg" aria-label="Blog post page navigation"><a class="pagination-nav__link pagination-nav__link--prev" href="/blog/2024/06/13/release-095/"><div class="pagination-nav__sublabel">Newer Post</div><div class="pagination-nav__label">Apache StreamPipes release 0.95.0</div></a><a class="pagination-nav__link pagination-nav__link--next" href="/blog/2023/11/28/release-093/"><div class="pagination-nav__sublabel">Older Post</div><div class="pagination-nav__label">Apache StreamPipes release 0.93.0</div></a></nav></main><div class="col col--2"><div class="tableOfContents_bqdL thin-scrollbar"><ul class="table-of-contents table-of-contents__left-border"><li><a href="#motivation" class="table-of-contents__link toc-highlight">Motivation</a></li><li><a href="#set-up--prepare-python-client" class="table-of-contents__link toc-highlight">Set Up & Prepare Python Client</a></li><li><a href="#model-training-with-historic-data" class="table-of-contents__link toc-highlight">Model Training with Historic Data</a></li><li><a href="#model-inference-with-live-data" class="table-of-contents__link toc-highlight">Model Inference with Live Data</a></li></ul></div></div></div></div></div><footer class="container_yWIM"><div class="linksRow_XFIr"><div class="linksCol_rFJL"><div>ASF</div><ul><li class="footer__item"><a href="https://www.apache.org/" target="_blank" rel="noopener noreferrer"><span></span><span>Foundation</span></a></li><li class="footer__item"><a href="https://www.apache.org/licenses/" target="_blank" rel="noopener noreferrer"><span></span><span>License</span></a></li><li class="footer__item"><a href="https://www.apache.org/events/" target="_blank" rel="noopener noreferrer"><span></span><span>Events</span></a></li><li class="footer__item"><a href="https://www.apache.org/security/" target="_blank" rel="noopener noreferrer"><span></span><span>Security</span></a></li><li class="footer__item"><a href="https://www.apache.org/foundation/sponsorship.html" target="_blank" rel="noopener noreferrer"><span></span><span>Sponsorship</span></a></li><li class="footer__item"><a href="https://www.apache.org/foundation/thanks.html" target="_blank" rel="noopener noreferrer"><span></span><span>Thanks</span></a></li></ul></div><div class="linksCol_rFJL"><div>Community</div><ul><li class="footer__item"><a href="https://github.com/apache/streampipes/issues" target="_blank" rel="noopener noreferrer"><span></span><span>GitHub Issues</span></a></li><li class="footer__item"><a href="https://github.com/apache/streampipes/discussions" target="_blank" rel="noopener noreferrer"><span></span><span>Github Discussions</span></a></li><li class="footer__item"><a href="https://twitter.com/StreamPipes" target="_blank" rel="noopener noreferrer"><span></span><span>Twitter</span></a></li><li class="footer__item"><a href="https://www.linkedin.com/company/apache-streampipes" target="_blank" rel="noopener noreferrer"><span></span><span>LinkedIn</span></a></li></ul></div><div class="linksCol_rFJL"><div>More</div><ul><li class="footer__item"><a target="_parent" href="/blog/"><span></span><span>Blog</span></a></li><li class="footer__item"><a href="https://privacy.apache.org/policies/privacy-policy-public.html" target="_parent" rel="noopener noreferrer"><span></span><span>Privacy</span></a></li></ul></div></div><div class="copyright_AoBa"><a href="https://www.apache.org/" target="_blank" rel="noopener noreferrer"><span style="display:inline-block;width:231.25px;height:40px"></span></a><div>Copyright © 2019-2025 The Apache Software Foundation. Apache StreamPipes, Apache, the Apache feather logo, and the Apache StreamPipes project logo are either registered trademarks or trademarks of the Apache Software Foundation.</div></div></footer></div> |
| <script src="/assets/js/runtime~main.d38cef4e.js"></script> |
| <script src="/assets/js/main.6ee83347.js"></script> |
| </body> |
| </html> |