| <!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>The Heron Streamlet API for Scala · Apache Heron</title><meta name="viewport" content="width=device-width"/><meta name="generator" content="Docusaurus"/><meta name="description" content="<!--"/><meta name="docsearch:version" content="0.20.4-incubating"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="The Heron Streamlet API for Scala · Apache Heron"/><meta property="og:type" content="website"/><meta property="og:url" content="https://heron.apache.org/"/><meta property="og:description" content="<!--"/><meta property="og:image" content="https://heron.apache.org/img/undraw_online.svg"/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://heron.apache.org/img/undraw_tweetstorm.svg"/><link rel="shortcut icon" href="/img/favicon-32x32.png"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/default.min.css"/><link rel="alternate" type="application/atom+xml" href="https://heron.apache.org/blog/atom.xml" title="Apache Heron Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://heron.apache.org/blog/feed.xml" title="Apache Heron Blog RSS Feed"/><script> |
| (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){ |
| (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o), |
| m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m) |
| })(window,document,'script','https://www.google-analytics.com/analytics.js','ga'); |
| |
| ga('create', 'UA-198017384-1', 'auto'); |
| ga('send', 'pageview'); |
| </script><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="/js/custom.js"></script><script type="text/javascript" src="/js/fix-location.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/"><img class="logo" src="/img/HeronTextLogo-small.png" alt="Apache Heron"/><h2 class="headerTitleWithLogo">Apache Heron</h2></a><a href="/versions"><h3>0.20.4-incubating</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class=""><a href="/api/java" target="_self">Javadocs</a></li><li class=""><a href="/api/python" target="_self">Pydocs</a></li><li class="siteNavGroupActive"><a href="/docs/0.20.4-incubating/getting-started-local-single-node" target="_self">Docs</a></li><li class=""><a href="/download" target="_self">Downloads</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#apache" target="_self">Apache</a></li></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="container docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i>›</i><span>Topology Development APIs</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Getting Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/getting-started-local-single-node">Local (Single Node)</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/getting-started-migrate-storm-topologies">Migrate Storm Topologies</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/getting-started-docker">Heron & Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/getting-started-troubleshooting-guide">Troubleshooting Guide</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/deployment-overview">Deployment Overiew</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/deployment-configuration">Configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/deployment-api-server">The Heron API Server</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Topology Development APIs</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/topology-development-streamlet-api">The Heron Streamlet API for Java</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/topology-development-eco-api">The ECO API for Java</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/topology-development-topology-api-java">The Heron Topology API for Java</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/topology-development-topology-api-python">The Heron Topology API for Python</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/0.20.4-incubating/topology-development-streamlet-scala">The Heron Streamlet API for Scala</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client API Docs</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/client-api-docs-overview">Client API Docs</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Guides</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/guides-effectively-once-java-topologies">Effectively Once Java Topologies</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/guides-data-model">Heron Data Model</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/guides-tuple-serialization">Tuple Serialization</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/guides-ui-guide">Heron UI Guide</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/guides-topology-tuning">Topology Tuning Guide</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/guides-packing-algorithms">Packing Algorithms</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/guides-simulator-mode">Simulator Mode</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/guides-troubeshooting-guide">Topology Troubleshooting Guide</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Heron Concepts</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/heron-design-goals">Heron Design Goals</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/heron-topology-concepts">Heron Topologies</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/heron-streamlet-concepts">Heron Streamlets</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/heron-architecture">Heron Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/heron-delivery-semantics">Heron Delivery Semantics</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">State Managers</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/state-managers-zookeeper">Zookeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/state-managers-local-fs">Local File System</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Uploaders</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/uploaders-local-fs">Local File System</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/uploaders-hdfs">HDFS</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/uploaders-http">HTTP</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/uploaders-amazon-s3">Amazon S3</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/uploaders-scp">Secure Copy (SCP)</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Schedulers</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/schedulers-k8s-by-hand">Kubernetes by hand</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/schedulers-k8s-with-helm">Kubernetes with Helm</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/schedulers-aurora-cluster">Aurora Cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/schedulers-aurora-local">Aurora Locally</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/schedulers-local">Local Cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/schedulers-nomad">Nomad</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/schedulers-mesos-local-mac">Mesos Cluster Locally</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/schedulers-slurm">Slurm Cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/schedulers-yarn">YARN Cluster</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cluster Configuration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/cluster-config-overview">Cluster Config Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/cluster-config-system-level">System Level Configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/cluster-config-instance">Heron Instance</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/cluster-config-metrics">Metrics Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/cluster-config-stream">Stream Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/cluster-config-tmanager">Topology Manager</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Observability</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/observability-prometheus">Prometheus</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/observability-graphite">Graphite</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/observability-scribe">Scribe</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">User Manuals</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/user-manuals-heron-cli">Heron Client</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/user-manuals-heron-explorer">Heron Explorer</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/user-manuals-tracker-rest">Heron Tracker REST API</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/user-manuals-heron-tracker-runbook">Heron Tracker Runbook</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/user-manuals-heron-ui-runbook">Heron UI Runbook</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/user-manuals-heron-shell">Heron Shell</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Compiling</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/compiling-overview">Compiling Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/compiling-linux">Compiling on Linux</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/compiling-osx">Compiling on OS X</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/compiling-docker">Compiling With Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/compiling-running-tests">Running Tests</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/compiling-code-organization">Code Organization</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Extending Heron</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/extending-heron-scheduler">Custom Scheduler</a></li><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/extending-heron-metric-sink">Custom Metrics Sink</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Heron Resources</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/0.20.4-incubating/heron-resources-resources">Heron Resources</a></li></ul></div></div></section></div><script> |
| var coll = document.getElementsByClassName('collapsible'); |
| var checkActiveCategory = true; |
| for (var i = 0; i < coll.length; i++) { |
| var links = coll[i].nextElementSibling.getElementsByTagName('*'); |
| if (checkActiveCategory){ |
| for (var j = 0; j < links.length; j++) { |
| if (links[j].classList.contains('navListItemActive')){ |
| coll[i].nextElementSibling.classList.toggle('hide'); |
| coll[i].childNodes[1].classList.toggle('rotate'); |
| checkActiveCategory = false; |
| break; |
| } |
| } |
| } |
| |
| coll[i].addEventListener('click', function() { |
| var arrow = this.childNodes[1]; |
| arrow.classList.toggle('rotate'); |
| var content = this.nextElementSibling; |
| content.classList.toggle('hide'); |
| }); |
| } |
| |
| document.addEventListener('DOMContentLoaded', function() { |
| createToggler('#navToggler', '#docsNav', 'docsSliderActive'); |
| createToggler('#tocToggler', 'body', 'tocActive'); |
| |
| var headings = document.querySelector('.toc-headings'); |
| headings && headings.addEventListener('click', function(event) { |
| var el = event.target; |
| while(el !== headings){ |
| if (el.tagName === 'A') { |
| document.body.classList.remove('tocActive'); |
| break; |
| } else{ |
| el = el.parentNode; |
| } |
| } |
| }, false); |
| |
| function createToggler(togglerSelector, targetSelector, className) { |
| var toggler = document.querySelector(togglerSelector); |
| var target = document.querySelector(targetSelector); |
| |
| if (!toggler) { |
| return; |
| } |
| |
| toggler.onclick = function(event) { |
| event.preventDefault(); |
| |
| target.classList.toggle(className); |
| }; |
| } |
| }); |
| </script></nav></div><div class="container mainContainer"><div class="wrapper"><div class="post"><header class="postHeader"><h1 class="postHeaderTitle">The Heron Streamlet API for Scala</h1></header><article><div><span><!-- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| http://www.apache.org/licenses/LICENSE-2.0 |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| <h2><a class="anchor" aria-hidden="true" id="getting-started"></a><a href="#getting-started" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Getting started</h2> |
| <p>In order to use the Heron Streamlet API for Scala, you'll need to install the <code>heron-api</code> library.</p> |
| <h3><a class="anchor" aria-hidden="true" id="maven-setup"></a><a href="#maven-setup" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Maven setup</h3> |
| <p>In order to use the <code>heron-api</code> library, add this to the <code>dependencies</code> block of your <code>pom.xml</code> configuration file:</p> |
| <pre><code class="hljs css language-xml"><span class="hljs-tag"><<span class="hljs-name">dependency</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">groupId</span>></span>org.apache.heron<span class="hljs-tag"></<span class="hljs-name">groupId</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">artifactId</span>></span>heron-api<span class="hljs-tag"></<span class="hljs-name">artifactId</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">version</span>></span>{{<span class="hljs-tag">< <span class="hljs-attr">heronVersion</span> ></span>}}<span class="hljs-tag"></<span class="hljs-name">version</span>></span> |
| <span class="hljs-tag"></<span class="hljs-name">dependency</span>></span> |
| </code></pre> |
| <h4><a class="anchor" aria-hidden="true" id="compiling-a-jar-with-dependencies"></a><a href="#compiling-a-jar-with-dependencies" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Compiling a JAR with dependencies</h4> |
| <p>In order to run a Scala topology created using the Heron Streamlet API in a Heron cluster, you'll need to package your topology as a "fat" JAR with dependencies included. You can use the <a href="https://maven.apache.org/plugins/maven-assembly-plugin/usage.html">Maven Assembly Plugin</a> to generate JARs with dependencies. To install the plugin and add a Maven goal for a single JAR, add this to the <code>plugins</code> block in your <code>pom.xml</code>:</p> |
| <pre><code class="hljs css language-xml"><span class="hljs-tag"><<span class="hljs-name">plugin</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">artifactId</span>></span>maven-assembly-plugin<span class="hljs-tag"></<span class="hljs-name">artifactId</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">configuration</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">descriptorRefs</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">descriptorRef</span>></span>jar-with-dependencies<span class="hljs-tag"></<span class="hljs-name">descriptorRef</span>></span> |
| <span class="hljs-tag"></<span class="hljs-name">descriptorRefs</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">archive</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">manifest</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">mainClass</span>></span><span class="hljs-tag"></<span class="hljs-name">mainClass</span>></span> |
| <span class="hljs-tag"></<span class="hljs-name">manifest</span>></span> |
| <span class="hljs-tag"></<span class="hljs-name">archive</span>></span> |
| <span class="hljs-tag"></<span class="hljs-name">configuration</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">executions</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">execution</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">id</span>></span>make-assembly<span class="hljs-tag"></<span class="hljs-name">id</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">phase</span>></span>package<span class="hljs-tag"></<span class="hljs-name">phase</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">goals</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">goal</span>></span>single<span class="hljs-tag"></<span class="hljs-name">goal</span>></span> |
| <span class="hljs-tag"></<span class="hljs-name">goals</span>></span> |
| <span class="hljs-tag"></<span class="hljs-name">execution</span>></span> |
| <span class="hljs-tag"></<span class="hljs-name">executions</span>></span> |
| <span class="hljs-tag"></<span class="hljs-name">plugin</span>></span> |
| </code></pre> |
| <p>Once your <code>pom.xml</code> is properly set up, you can compile the JAR with dependencies using this command:</p> |
| <pre><code class="hljs css language-bash">$ mvn assembly:assembly |
| </code></pre> |
| <p>By default, this will add a JAR in your project's <code>target</code> folder with the name <code>PROJECT-NAME-VERSION-jar-with-dependencies.jar</code>. Here's an example topology submission command using a compiled JAR:</p> |
| <pre><code class="hljs css language-bash">$ mvn assembly:assembly |
| $ heron submit <span class="hljs-built_in">local</span> \ |
| target/my-project-1.2.3-jar-with-dependencies.jar \ |
| com.example.Main \ |
| MyTopology arg1 arg2 |
| </code></pre> |
| <h2><a class="anchor" aria-hidden="true" id="streamlet-api-topology-configuration"></a><a href="#streamlet-api-topology-configuration" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Streamlet API topology configuration</h2> |
| <p>Every Streamlet API topology needs to be configured using a <code>Config</code> object. Here's an example default configuration:</p> |
| <pre><code class="hljs css language-scala"><span class="hljs-keyword">import</span> org.apache.heron.streamlet.<span class="hljs-type">Config</span> |
| <span class="hljs-keyword">import</span> org.apache.heron.streamlet.scala.<span class="hljs-type">Runner</span> |
| |
| <span class="hljs-keyword">val</span> topologyConfig = <span class="hljs-type">Config</span>.defaultConfig() |
| |
| <span class="hljs-comment">// Apply topology configuration using the topologyConfig object</span> |
| <span class="hljs-keyword">val</span> topologyRunner = <span class="hljs-keyword">new</span> <span class="hljs-type">Runner</span>() |
| topologyRunner.run(<span class="hljs-string">"name-for-topology"</span>, topologyConfig, topologyBuilder) |
| </code></pre> |
| <p>The table below shows the configurable parameters for Heron topologies:</p> |
| <table> |
| <thead> |
| <tr><th style="text-align:left">Parameter</th><th style="text-align:left">Default</th></tr> |
| </thead> |
| <tbody> |
| <tr><td style="text-align:left"><a href="#delivery-semantics">Delivery semantics</a></td><td style="text-align:left">At most once</td></tr> |
| <tr><td style="text-align:left">Serializer</td><td style="text-align:left"><a href="https://github.com/EsotericSoftware/kryo">Kryo</a></td></tr> |
| <tr><td style="text-align:left">Number of total container topologies</td><td style="text-align:left">2</td></tr> |
| <tr><td style="text-align:left">Per-container CPU</td><td style="text-align:left">1.0</td></tr> |
| <tr><td style="text-align:left">Per-container RAM</td><td style="text-align:left">100 MB</td></tr> |
| </tbody> |
| </table> |
| <p>Here's an example non-default configuration:</p> |
| <pre><code class="hljs css language-scala"><span class="hljs-keyword">val</span> topologyConfig = <span class="hljs-type">Config</span>.newBuilder() |
| .setNumContainers(<span class="hljs-number">5</span>) |
| .setPerContainerRamInGigabytes(<span class="hljs-number">10</span>) |
| .setPerContainerCpu(<span class="hljs-number">3.5</span>f) |
| .setDeliverySemantics(<span class="hljs-type">Config</span>.<span class="hljs-type">DeliverySemantics</span>.<span class="hljs-type">EFFECTIVELY_ONCE</span>) |
| .setSerializer(<span class="hljs-type">Config</span>.<span class="hljs-type">Serializer</span>.<span class="hljs-type">JAVA</span>) |
| .setUserConfig(<span class="hljs-string">"some-key"</span>, <span class="hljs-string">"some-value"</span>) |
| .build() |
| </code></pre> |
| <h3><a class="anchor" aria-hidden="true" id="delivery-semantics"></a><a href="#delivery-semantics" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Delivery semantics</h3> |
| <p>You can apply <a href="../../../concepts/delivery-semantics">delivery semantics</a> to a Streamlet API topology like this:</p> |
| <pre><code class="hljs css language-scala">topologyConfig |
| .setDeliverySemantics(<span class="hljs-type">Config</span>.<span class="hljs-type">DeliverySemantics</span>.<span class="hljs-type">EFFECTIVELY_ONCE</span>) |
| </code></pre> |
| <p>The other available options in the <code>DeliverySemantics</code> enum are <code>ATMOST_ONCE</code> and <code>ATLEAST_ONCE</code>.</p> |
| <h2><a class="anchor" aria-hidden="true" id="streamlets"></a><a href="#streamlets" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Streamlets</h2> |
| <p>In the Heron Streamlet API for Scala, processing graphs consist of <a href="../../../concepts/topologies#streamlets">streamlets</a>. One or more supplier streamlets inject data into your graph to be processed by downstream operators.</p> |
| <h2><a class="anchor" aria-hidden="true" id="operations"></a><a href="#operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Operations</h2> |
| <table> |
| <thead> |
| <tr><th style="text-align:left">Operation</th><th style="text-align:left">Description</th><th style="text-align:left">Example</th></tr> |
| </thead> |
| <tbody> |
| <tr><td style="text-align:left"><a href="#map-operations"><code>map</code></a></td><td style="text-align:left">Create a new streamlet by applying the supplied mapping function to each element in the original streamlet</td><td style="text-align:left">Add 1 to each element in a streamlet of integers</td></tr> |
| <tr><td style="text-align:left"><a href="#flatmap-operations"><code>flatMap</code></a></td><td style="text-align:left">Like a map operation but with the important difference that each element of the streamlet is flattened</td><td style="text-align:left">Flatten a sentence into individual words</td></tr> |
| <tr><td style="text-align:left"><a href="#filter-operations"><code>filter</code></a></td><td style="text-align:left">Create a new streamlet containing only the elements that satisfy the supplied filtering function</td><td style="text-align:left">Remove all inappropriate words from a streamlet of strings</td></tr> |
| <tr><td style="text-align:left"><a href="#union-operations"><code>union</code></a></td><td style="text-align:left">Unifies two streamlets into one, without modifying the elements of the two streamlets</td><td style="text-align:left">Unite two different <code>Streamlet<String></code>s into a single streamlet</td></tr> |
| <tr><td style="text-align:left"><a href="#clone-operations"><code>clone</code></a></td><td style="text-align:left">Creates any number of identical copies of a streamlet</td><td style="text-align:left">Create three separate streamlets from the same source</td></tr> |
| <tr><td style="text-align:left"><a href="#transform-operations"><code>transform</code></a></td><td style="text-align:left">Transform a streamlet using whichever logic you'd like (useful for transformations that don't neatly map onto the available operations)</td></tr> |
| <tr><td style="text-align:left"><a href="#join-operations"><code>join</code></a></td><td style="text-align:left">Create a new streamlet by combining two separate key-value streamlets into one on the basis of each element's key. Supported Join Types: Inner (as default), Outer-Left, Outer-Right and Outer</td><td style="text-align:left">Combine key-value pairs listing current scores (e.g. <code>("h4x0r", 127)</code>) for each user into a single per-user stream</td></tr> |
| <tr><td style="text-align:left"><a href="#key-by-operations"><code>keyBy</code></a></td><td style="text-align:left">Returns a new key-value streamlet by applying the supplied extractors to each element in the original streamlet</td></tr> |
| <tr><td style="text-align:left"><a href="#reduce-by-key-operations"><code>reduceByKey</code></a></td><td style="text-align:left">Produces a streamlet of key-value on each key, and in accordance with a reduce function that you apply to all the accumulated values</td><td style="text-align:left">Count the number of times a value has been encountered</td></tr> |
| <tr><td style="text-align:left"><a href="#reduce-by-key-and-window-operations"><code>reduceByKeyAndWindow</code></a></td><td style="text-align:left">Produces a streamlet of key-value on each key, within a time window, and in accordance with a reduce function that you apply to all the accumulated values</td><td style="text-align:left">Count the number of times a value has been encountered within a specified time window</td></tr> |
| <tr><td style="text-align:left"><a href="#count-by-key-operations"><code>countByKey</code></a></td><td style="text-align:left">A special reduce operation of counting number of tuples on each key</td><td style="text-align:left">Count the number of times a value has been encountered</td></tr> |
| <tr><td style="text-align:left"><a href="#count-by-key-and-window-operations"><code>countByKeyAndWindow</code></a></td><td style="text-align:left">A special reduce operation of counting number of tuples on each key, within a time window</td><td style="text-align:left">Count the number of times a value has been encountered within a specified time window</td></tr> |
| <tr><td style="text-align:left"><a href="#split-operations"><code>split</code></a></td><td style="text-align:left">Split a streamlet into multiple streamlets with different id</td></tr> |
| <tr><td style="text-align:left"><a href="#with-stream-operations"><code>withStream</code></a></td><td style="text-align:left">Select a stream with id from a streamlet that contains multiple streams</td></tr> |
| <tr><td style="text-align:left"><a href="#apply-operator-operations"><code>applyOperator</code></a></td><td style="text-align:left">Returns a new streamlet by applying an user defined operator to the original streamlet</td><td style="text-align:left">Apply an existing bolt as an operator</td></tr> |
| <tr><td style="text-align:left"><a href="#repartition-operations"><code>repartition</code></a></td><td style="text-align:left">Create a new streamlet by applying a new parallelism level to the original streamlet</td><td style="text-align:left">Increase the parallelism of a streamlet from 5 to 10</td></tr> |
| <tr><td style="text-align:left"><a href="#sink-operations"><code>toSink</code></a></td><td style="text-align:left">Sink operations terminate the processing graph by storing elements in a database, logging elements to stdout, etc.</td><td style="text-align:left">Store processing graph results in an AWS Redshift table</td></tr> |
| <tr><td style="text-align:left"><a href="#log-operations"><code>log</code></a></td><td style="text-align:left">Logs the final results of a processing graph to stdout. This <em>must</em> be the last step in the graph.</td></tr> |
| <tr><td style="text-align:left"><a href="#consume-operations"><code>consume</code></a></td><td style="text-align:left">Consume operations are like sink operations except they don't require implementing a full sink interface (consume operations are thus suited for simple operations like logging)</td><td style="text-align:left">Log processing graph results using a custom formatting function</td></tr> |
| </tbody> |
| </table> |
| <h3><a class="anchor" aria-hidden="true" id="map-operations"></a><a href="#map-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Map operations</h3> |
| <p>Map operations create a new streamlet by applying the supplied mapping function to each element in the original streamlet. Here's an example:</p> |
| <pre><code class="hljs css language-scala">builder.newSource(() => <span class="hljs-number">1</span>) |
| .map[<span class="hljs-type">Int</span>]((i: <span class="hljs-type">Int</span>) => i + <span class="hljs-number">12</span>) <span class="hljs-comment">// or .map[Int](_.+(12)) as synthetic function</span> |
| </code></pre> |
| <p>In this example, a supplier streamlet emits an indefinite series of 1s. The <code>map</code> operation then adds 12 to each incoming element, producing a streamlet of 13s.</p> |
| <h3><a class="anchor" aria-hidden="true" id="flatmap-operations"></a><a href="#flatmap-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>FlatMap operations</h3> |
| <p>FlatMap operations are like <code>map</code> operations but with the important difference that each element of the streamlet is "flattened" into a collection type. In this example, a supplier streamlet emits the same sentence over and over again; the <code>flatMap</code> operation transforms each sentence into a Scala <code>List</code> of individual words:</p> |
| <pre><code class="hljs css language-scala">builder.newSource(() => <span class="hljs-string">"I have nothing to declare but my genius"</span>) |
| .flatMap[<span class="hljs-type">String</span>](_.split(<span class="hljs-string">" "</span>)) |
| </code></pre> |
| <p>The effect of this operation is to transform the <code>Streamlet[String]</code> into a <code>Streamlet[List[String]]</code>.</p> |
| <blockquote> |
| <p>One of the core differences between <code>map</code> and <code>flatMap</code> operations is that <code>flatMap</code> operations typically transform non-collection types into collection types.</p> |
| </blockquote> |
| <h3><a class="anchor" aria-hidden="true" id="filter-operations"></a><a href="#filter-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Filter operations</h3> |
| <p>Filter operations retain elements in a streamlet, while potentially excluding some or all elements, on the basis of a provided filtering function. Here's an example:</p> |
| <pre><code class="hljs css language-scala"><span class="hljs-keyword">import</span> java.util.concurrent.<span class="hljs-type">ThreadLocalRandom</span> |
| |
| builder.newSource(() => <span class="hljs-type">ThreadLocalRandom</span>.current().nextInt(<span class="hljs-number">1</span>, <span class="hljs-number">11</span>)) |
| .filter(_.<(<span class="hljs-number">7</span>)) |
| </code></pre> |
| <p>In this example, a source streamlet consisting of random integers between 1 and 10 is modified by a filter operation that removes all streamlet elements that are lower than 7.</p> |
| <h3><a class="anchor" aria-hidden="true" id="union-operations"></a><a href="#union-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Union operations</h3> |
| <p>Union operations combine two streamlets of the same type into a single streamlet without modifying the elements. Here's an example:</p> |
| <pre><code class="hljs css language-scala"><span class="hljs-keyword">val</span> flowers = builder.newSource(() => <span class="hljs-string">"flower"</span>) |
| <span class="hljs-keyword">val</span> butterflies = builder.newSource(() => <span class="hljs-string">"butterfly"</span>) |
| |
| <span class="hljs-keyword">val</span> combinedSpringStreamlet = flowers.union(butterflies) |
| </code></pre> |
| <p>Here, one streamlet is an endless series of "flowers" while the other is an endless series of "butterflies". The <code>union</code> operation combines them into a single streamlet of alternating "flowers" and "butterflies".</p> |
| <h3><a class="anchor" aria-hidden="true" id="clone-operations"></a><a href="#clone-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Clone operations</h3> |
| <p>Clone operations enable you to create any number of "copies" of a streamlet. Each of the "copy" streamlets contains all the elements of the original and can be manipulated just like the original streamlet. Here's an example:</p> |
| <pre><code class="hljs css language-scala"><span class="hljs-keyword">import</span> scala.util.<span class="hljs-type">Random</span> |
| |
| <span class="hljs-keyword">val</span> integers = builder.newSource(() => <span class="hljs-type">Random</span>.nextInt(<span class="hljs-number">100</span>)) |
| |
| <span class="hljs-keyword">val</span> copies = integers.clone(<span class="hljs-number">5</span>) |
| <span class="hljs-keyword">val</span> ints1 = copies.get(<span class="hljs-number">0</span>) |
| <span class="hljs-keyword">val</span> ints2 = copies.get(<span class="hljs-number">1</span>) |
| <span class="hljs-keyword">val</span> ints3 = copies.get(<span class="hljs-number">2</span>) |
| <span class="hljs-comment">// and so on...</span> |
| </code></pre> |
| <p>In this example, a streamlet of random integers between 0 and 99 is split into 5 identical streamlets.</p> |
| <h3><a class="anchor" aria-hidden="true" id="transform-operations"></a><a href="#transform-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Transform operations</h3> |
| <p>Transform operations are highly flexible operations that are most useful for:</p> |
| <ul> |
| <li>operations involving state in <a href="../../concepts/delivery-semantics#stateful-topologies">stateful topologies</a></li> |
| <li>operations that don't neatly fit into the other categories or into a lambda-based logic</li> |
| </ul> |
| <p>Transform operations require you to implement three different methods:</p> |
| <ul> |
| <li>A <code>setup</code> function that enables you to pass a context object to the operation and to specify what happens prior to the <code>transform</code> step</li> |
| <li>A <code>transform</code> operation that performs the desired transformation</li> |
| <li>A <code>cleanup</code> function that allows you to specify what happens after the <code>transform</code> step</li> |
| </ul> |
| <p>The context object available to a transform operation provides access to:</p> |
| <ul> |
| <li>the current state of the topology</li> |
| <li>the topology's configuration</li> |
| <li>the name of the stream</li> |
| <li>the stream partition</li> |
| <li>the current task ID</li> |
| </ul> |
| <p>Here's a Scala example of a transform operation in a topology where a stateful record is kept of the number of items processed:</p> |
| <pre><code class="hljs css language-scala"><span class="hljs-keyword">import</span> org.apache.heron.streamlet.<span class="hljs-type">Context</span> |
| <span class="hljs-keyword">import</span> org.apache.heron.streamlet.scala.<span class="hljs-type">SerializableTransformer</span> |
| |
| <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">CountNumberOfItems</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">SerializableTransformer</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>] </span>{ |
| <span class="hljs-keyword">private</span> <span class="hljs-keyword">val</span> numberOfItems = <span class="hljs-keyword">new</span> <span class="hljs-type">AtomicLong</span>() |
| |
| <span class="hljs-keyword">override</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">setup</span></span>(context: <span class="hljs-type">Context</span>): <span class="hljs-type">Unit</span> = { |
| numberOfItems.incrementAndGet() |
| context.getState().put(<span class="hljs-string">"number-of-items"</span>, numberOfItems) |
| } |
| |
| <span class="hljs-keyword">override</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">transform</span></span>(i: <span class="hljs-type">String</span>, f: <span class="hljs-type">String</span> => <span class="hljs-type">Unit</span>): <span class="hljs-type">Unit</span> = { |
| <span class="hljs-keyword">val</span> transformedString = i.toUpperCase |
| f(transformedString) |
| } |
| |
| <span class="hljs-keyword">override</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">cleanup</span></span>(): <span class="hljs-type">Unit</span> = |
| println(<span class="hljs-string">s"Successfully processed new state: <span class="hljs-subst">$numberOfItems</span>"</span>) |
| } |
| </code></pre> |
| <p>This operation does a few things:</p> |
| <ul> |
| <li>In the <code>setup</code> method, the <a href="/api/java/org/apache/heron/streamlet/Context.html"><code>Context</code></a> object is used to access the current state (which has the semantics of a Java <code>Map</code>). The current number of items processed is incremented by one and then saved as the new state.</li> |
| <li>In the <code>transform</code> method, the incoming string is transformed as UpperCase in some way and then "accepted" as the new value.</li> |
| <li>In the <code>cleanup</code> step, the current count of items processed is logged.</li> |
| </ul> |
| <p>Here's that operation within the context of a streamlet processing graph:</p> |
| <pre><code class="hljs css language-scala">builder.newSource(() => <span class="hljs-string">"Some string over and over"</span>); |
| .transform(<span class="hljs-keyword">new</span> <span class="hljs-type">CountNumberOfItems</span>()) |
| .log() |
| </code></pre> |
| <h3><a class="anchor" aria-hidden="true" id="join-operations"></a><a href="#join-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Join operations</h3> |
| <blockquote> |
| <p>For a more in-depth conceptual discussion of joins, see the <a href="../../../concepts/streamlet-api#join-operations">Heron Streamlet API</a> doc.</p> |
| </blockquote> |
| <p>Join operations unify two streamlets <em>on a key</em> (join operations thus require KV streamlets). Each <code>KeyValue</code> object in a streamlet has, by definition, a key. When a <code>join</code> operation is added to a processing graph,</p> |
| <pre><code class="hljs css language-scala"><span class="hljs-keyword">import</span> org.apache.heron.streamlet.{<span class="hljs-type">Config</span>, <span class="hljs-type">KeyValue</span>, <span class="hljs-type">WindowConfig</span>} |
| <span class="hljs-keyword">import</span> org.apache.heron.streamlet.scala.<span class="hljs-type">Builder</span> |
| |
| <span class="hljs-keyword">val</span> builder = <span class="hljs-type">Builder</span>.newBuilder() |
| |
| <span class="hljs-keyword">val</span> streamlet1 = builder |
| .newSource(() => |
| <span class="hljs-keyword">new</span> <span class="hljs-type">KeyValue</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>](<span class="hljs-string">"heron-api"</span>, <span class="hljs-string">"topology-api"</span>)) |
| .setName(<span class="hljs-string">"streamlet1"</span>) |
| |
| <span class="hljs-keyword">val</span> streamlet2 = builder |
| .newSource(() => |
| <span class="hljs-keyword">new</span> <span class="hljs-type">KeyValue</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>](<span class="hljs-string">"heron-api"</span>, <span class="hljs-string">"streamlet-api"</span>)) |
| .setName(<span class="hljs-string">"streamlet2"</span>) |
| |
| streamlet1.join[<span class="hljs-type">KeyValue</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>], <span class="hljs-type">KeyValue</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>], <span class="hljs-type">String</span>]( |
| streamlet2, |
| (kv: <span class="hljs-type">KeyValue</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>]) => kv, |
| (kv: <span class="hljs-type">KeyValue</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>]) => kv, |
| <span class="hljs-type">WindowConfig</span>.<span class="hljs-type">TumblingCountWindow</span>(<span class="hljs-number">10</span>), |
| (kv1: <span class="hljs-type">KeyValue</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>], kv2: <span class="hljs-type">KeyValue</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>]) => |
| kv1.getValue + <span class="hljs-string">" - "</span> + kv2.getValue |
| ) |
| </code></pre> |
| <p>In this case, the resulting streamlet would consist of an indefinite stream with two <code>KeyValue</code> objects with the key <code>heron-api</code> but different values (<code>topology-api</code> and <code>streamlet-api</code>).</p> |
| <blockquote> |
| <p>The effect of a <code>join</code> operation is to create a new streamlet <em>for each key</em>.</p> |
| </blockquote> |
| <h3><a class="anchor" aria-hidden="true" id="key-by-operations"></a><a href="#key-by-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Key by operations</h3> |
| <p>Key by operations convert each item in the original streamlet into a key-value pair and return a new streamlet. Here is an example:</p> |
| <pre><code class="hljs css language-scala"><span class="hljs-keyword">val</span> builder = <span class="hljs-type">Builder</span>.newBuilder() |
| |
| builder |
| .newSource(() => <span class="hljs-string">"Paco de Lucia is one of the most popular virtuoso"</span>) |
| <span class="hljs-comment">// Convert each sentence into individual words</span> |
| .flatMap[<span class="hljs-type">String</span>](_.split(<span class="hljs-string">" "</span>)) |
| .keyBy[<span class="hljs-type">String</span>, <span class="hljs-type">Int</span>]( |
| <span class="hljs-comment">// Key extractor (in this case, each word acts as the key)</span> |
| (word: <span class="hljs-type">String</span>) => word, |
| <span class="hljs-comment">// Value extractor (get the length of each word)</span> |
| (word: <span class="hljs-type">String</span>) => word.length |
| ) |
| <span class="hljs-comment">// The result is logged</span> |
| .log(); |
| </code></pre> |
| <h3><a class="anchor" aria-hidden="true" id="reduce-by-key-operations"></a><a href="#reduce-by-key-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Reduce by key operations</h3> |
| <p>You can apply <a href="https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html">reduce</a> operations to streamlets by specifying:</p> |
| <ul> |
| <li>a key extractor that determines what counts as the key for the streamlet</li> |
| <li>a value extractor that determines which final value is chosen for each element of the streamlet</li> |
| <li>a reduce function that produces a single value for each key in the streamlet</li> |
| </ul> |
| <p>Reduce by key operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value). Here's an example:</p> |
| <pre><code class="hljs css language-scala"><span class="hljs-keyword">val</span> builder = <span class="hljs-type">Builder</span>.newBuilder() |
| |
| builder |
| .newSource(() => <span class="hljs-string">"Paco de Lucia is one of the most popular virtuoso"</span>) |
| <span class="hljs-comment">// Convert each sentence into individual words</span> |
| .flatMap[<span class="hljs-type">String</span>](_.split(<span class="hljs-string">" "</span>)) |
| .reduceByKey[<span class="hljs-type">String</span>, <span class="hljs-type">Int</span>]( |
| <span class="hljs-comment">// Key extractor (in this case, each word acts as the key)</span> |
| (word: <span class="hljs-type">String</span>) => word, |
| <span class="hljs-comment">// Value extractor (each word appears only once, hence the value is always 1)</span> |
| (word: <span class="hljs-type">String</span>) => <span class="hljs-number">1</span>, |
| <span class="hljs-comment">// Reduce operation (a running sum)</span> |
| (x: <span class="hljs-type">Int</span>, y: <span class="hljs-type">Int</span>) => x + y) |
| <span class="hljs-comment">// The result is logged</span> |
| .log(); |
| </code></pre> |
| <h3><a class="anchor" aria-hidden="true" id="reduce-by-key-and-window-operations"></a><a href="#reduce-by-key-and-window-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Reduce by key and window operations</h3> |
| <p>You can apply <a href="https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html">reduce</a> operations to streamlets by specifying:</p> |
| <ul> |
| <li>a key extractor that determines what counts as the key for the streamlet</li> |
| <li>a value extractor that determines which final value is chosen for each element of the streamlet</li> |
| <li>a <a href="../../../concepts/topologies#window-operations">time window</a> across which the operation will take place</li> |
| <li>a reduce function that produces a single value for each key in the streamlet</li> |
| </ul> |
| <p>Reduce by key and window operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value, as well as information about the window in which the operation took place). Here's an example:</p> |
| <pre><code class="hljs css language-scala"><span class="hljs-keyword">import</span> org.apache.heron.streamlet.<span class="hljs-type">WindowConfig</span>; |
| |
| <span class="hljs-keyword">val</span> builder = <span class="hljs-type">Builder</span>.newBuilder() |
| |
| builder |
| .newSource(() => <span class="hljs-string">"Paco de Lucia is one of the most popular virtuoso"</span>) |
| <span class="hljs-comment">// Convert each sentence into individual words</span> |
| .flatMap[<span class="hljs-type">String</span>](_.split(<span class="hljs-string">" "</span>)) |
| .reduceByKeyAndWindow[<span class="hljs-type">String</span>, <span class="hljs-type">Int</span>]( |
| <span class="hljs-comment">// Key extractor (in this case, each word acts as the key)</span> |
| (word: <span class="hljs-type">String</span>) => word, |
| <span class="hljs-comment">// Value extractor (each word appears only once, hence the value is always 1)</span> |
| (word: <span class="hljs-type">String</span>) => <span class="hljs-number">1</span>, |
| <span class="hljs-comment">// Window configuration</span> |
| <span class="hljs-type">WindowConfig</span>.<span class="hljs-type">TumblingCountWindow</span>(<span class="hljs-number">50</span>), |
| <span class="hljs-comment">// Reduce operation (a running sum)</span> |
| (x: <span class="hljs-type">Int</span>, y: <span class="hljs-type">Int</span>) => x + y) |
| <span class="hljs-comment">// The result is logged</span> |
| .log(); |
| </code></pre> |
| <h3><a class="anchor" aria-hidden="true" id="count-by-key-operations"></a><a href="#count-by-key-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Count by key operations</h3> |
| <p>Count by key operations extract keys from data in the original streamlet and count the number of times a key has been encountered. Here's an example:</p> |
| <pre><code class="hljs css language-scala"><span class="hljs-keyword">val</span> builder = <span class="hljs-type">Builder</span>.newBuilder() |
| |
| builder |
| .newSource(() => <span class="hljs-string">"Paco de Lucia is one of the most popular virtuoso"</span>) |
| <span class="hljs-comment">// Convert each sentence into individual words</span> |
| .flatMap[<span class="hljs-type">String</span>](_.split(<span class="hljs-string">" "</span>)) |
| <span class="hljs-comment">// Count the number of occurrences of each word</span> |
| .countByKey[<span class="hljs-type">String</span>]((word: <span class="hljs-type">String</span>) => word) |
| <span class="hljs-comment">// The result is logged</span> |
| .log(); |
| </code></pre> |
| <h3><a class="anchor" aria-hidden="true" id="count-by-key-and-window-operations"></a><a href="#count-by-key-and-window-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Count by key and window operations</h3> |
| <p>Count by key and window operations extract keys from data in the original streamlet and count the number of times a key has been encountered within each <a href="../../../concepts/topologies#window-operations">time window</a>. Here's an example:</p> |
| <pre><code class="hljs css language-scala"><span class="hljs-keyword">val</span> builder = <span class="hljs-type">Builder</span>.newBuilder() |
| |
| builder |
| .newSource(() => <span class="hljs-string">"Paco de Lucia is one of the most popular virtuoso"</span>) |
| <span class="hljs-comment">// Convert each sentence into individual words</span> |
| .flatMap[<span class="hljs-type">String</span>](_.split(<span class="hljs-string">" "</span>)) |
| <span class="hljs-comment">// Count the number of occurrences of each word within each time window</span> |
| .countByKeyAndWindow[<span class="hljs-type">String</span>]( |
| (word: <span class="hljs-type">String</span>) => word, |
| <span class="hljs-type">WindowConfig</span>.<span class="hljs-type">TumblingCountWindow</span>(<span class="hljs-number">50</span>)) |
| <span class="hljs-comment">// The result is logged</span> |
| .log(); |
| </code></pre> |
| <h3><a class="anchor" aria-hidden="true" id="split-operations"></a><a href="#split-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Split operations</h3> |
| <p>Split operations split a streamlet into multiple streamlets with different id by getting the corresponding stream ids from each item in the origina streamlet. Here is an example:</p> |
| <pre><code class="hljs css language-scala">val builder = Builder.newBuilder() |
| |
| builder |
| .newSource(() => "Paco de Lucia is one of the most popular virtuoso") |
| // Convert each sentence into individual words |
| .flatMap[String](_.split(" ")) |
| // Count the number of occurrences of each word within each time window |
| .split(Map( |
| "long_word" -> { word: String => word.length >= 4 }, |
| "short_word" -> { word: String => word.length < 4 } |
| )) |
| .withStream("short_word) |
| // The result is logged |
| .log(); |
| </code></pre> |
| <h3><a class="anchor" aria-hidden="true" id="with-stream-operations"></a><a href="#with-stream-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>With stream operations</h3> |
| <p>With stream operations select a stream with id from a streamlet that contains multiple streams. They are often used with <a href="#split-operations">split</a>.</p> |
| <h3><a class="anchor" aria-hidden="true" id="apply-operator-operations"></a><a href="#apply-operator-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Apply operator operations</h3> |
| <p>Apply operator operations apply a user defined operator (like a bolt) to each element of the original streamlet and return a new streamlet. Here is an example:</p> |
| <pre><code class="hljs css language-scala"><span class="hljs-keyword">val</span> builder = <span class="hljs-type">Builder</span>.newBuilder() |
| |
| <span class="hljs-keyword">private</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">MyBoltOperator</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">MyBolt</span></span> |
| <span class="hljs-keyword">with</span> <span class="hljs-type">IStreamletOperator</span>[<span class="hljs-type">String</span>, <span class="hljs-type">String</span>] { |
| } |
| |
| builder |
| .newSource(() => <span class="hljs-string">"Paco de Lucia is one of the most popular virtuoso"</span>) |
| <span class="hljs-comment">// Convert each sentence into individual words</span> |
| .flatMap[<span class="hljs-type">String</span>](_.split(<span class="hljs-string">" "</span>)) |
| <span class="hljs-comment">// Apply user defined operation</span> |
| .applyOperator(<span class="hljs-keyword">new</span> <span class="hljs-type">MyBoltOperator</span>()) |
| <span class="hljs-comment">// The result is logged</span> |
| .log(); |
| </code></pre> |
| <h3><a class="anchor" aria-hidden="true" id="repartition-operations"></a><a href="#repartition-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Repartition operations</h3> |
| <p>When you assign a number of <a href="#partitioning-and-parallelism">partitions</a> to a processing step, each step that comes after it inherits that number of partitions. Thus, if you assign 5 partitions to a <code>map</code> operation, then any <code>mapToKV</code>, <code>flatMap</code>, <code>filter</code>, etc. operations that come after it will also be assigned 5 partitions. But you can also change the number of partitions for a processing step (as well as the number of partitions for downstream operations) using <code>repartition</code>. Here's an example:</p> |
| <pre><code class="hljs css language-scala"><span class="hljs-keyword">import</span> java.util.concurrent.<span class="hljs-type">ThreadLocalRandom</span>; |
| |
| <span class="hljs-keyword">val</span> builder = <span class="hljs-type">Builder</span>.newBuilder |
| |
| <span class="hljs-keyword">val</span> numbers = builder |
| .newSource(() => <span class="hljs-type">ThreadLocalRandom</span>.current().nextInt(<span class="hljs-number">1</span>, <span class="hljs-number">11</span>)) |
| |
| numbers |
| .setNumPartitions(<span class="hljs-number">5</span>) |
| .map(i => i + <span class="hljs-number">1</span>) |
| .repartition(<span class="hljs-number">2</span>) |
| .filter(i => i > <span class="hljs-number">7</span> && i < <span class="hljs-number">2</span>) |
| .log() |
| </code></pre> |
| <p>In this example, the supplier streamlet emits random integers between 1 and 10. That operation is assigned 5 partitions. After the <code>map</code> operation, the <code>repartition</code> function is used to assign 2 partitions to all downstream operations.</p> |
| <h3><a class="anchor" aria-hidden="true" id="sink-operations"></a><a href="#sink-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Sink operations</h3> |
| <p>In processing graphs like the ones you build using the Heron Streamlet API, <strong>sinks</strong> are essentially the terminal points in your graph, where your processing logic comes to an end. A processing graph can end with writing to a database, publishing to a topic in a pub-sub messaging system, and so on. With the Streamlet API, you can implement your own custom sinks. Here's an example:</p> |
| <pre><code class="hljs css language-scala"><span class="hljs-keyword">import</span> org.apache.heron.streamlet.<span class="hljs-type">Context</span> |
| <span class="hljs-keyword">import</span> org.apache.heron.streamlet.scala.<span class="hljs-type">Sink</span> |
| |
| <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">FormattedLogSink</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">Sink</span>[<span class="hljs-type">String</span>] </span>{ |
| <span class="hljs-keyword">private</span> <span class="hljs-keyword">var</span> streamName: <span class="hljs-type">Option</span>[<span class="hljs-type">String</span>] = <span class="hljs-type">None</span> |
| |
| <span class="hljs-keyword">override</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">setup</span></span>(context: <span class="hljs-type">Context</span>): <span class="hljs-type">Unit</span> = |
| streamName = <span class="hljs-type">Some</span>(context.getStreamName) |
| |
| <span class="hljs-keyword">override</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">put</span></span>(tuple: <span class="hljs-type">String</span>): <span class="hljs-type">Unit</span> = |
| println(<span class="hljs-string">s"The current value of tuple is <span class="hljs-subst">$tuple</span> in stream: <span class="hljs-subst">$streamName</span>"</span>) |
| |
| <span class="hljs-keyword">override</span> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">cleanup</span></span>(): <span class="hljs-type">Unit</span> = {} |
| } |
| </code></pre> |
| <p>In this example, the sink fetches the name of the enclosing streamlet from the context passed in the <code>setup</code> method. The <code>put</code> method specifies how the sink handles each element that is received (in this case, a formatted message is logged to stdout). The <code>cleanup</code> method enables you to specify what happens after the element has been processed by the sink.</p> |
| <p>Here is the <code>FormattedLogSink</code> at work in an example processing graph:</p> |
| <pre><code class="hljs css language-scala"><span class="hljs-keyword">val</span> builder = <span class="hljs-type">Builder</span>.newBuilder |
| |
| builder.newSource(() => <span class="hljs-string">"Here is a string to be passed to the sink"</span>) |
| .toSink(<span class="hljs-keyword">new</span> <span class="hljs-type">FormattedLogSink</span>) |
| </code></pre> |
| <blockquote> |
| <p><a href="#log-operations">Log operations</a> rely on a log sink that is provided out of the box. You'll need to implement other sinks yourself.</p> |
| </blockquote> |
| <h3><a class="anchor" aria-hidden="true" id="log-operations"></a><a href="#log-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Log operations</h3> |
| <p>Log operations are special cases of consume operations that log streamlet elements to stdout.</p> |
| <blockquote> |
| <p>Streamlet elements will be using their <code>toString</code> representations and at the <code>INFO</code> level.</p> |
| </blockquote> |
| <h3><a class="anchor" aria-hidden="true" id="consume-operations"></a><a href="#consume-operations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Consume operations</h3> |
| <p>Consume operations are like <a href="#sink-operations">sink operations</a> except they don't require implementing a full sink interface. Consume operations are thus suited for simple operations like formatted logging. Here's an example:</p> |
| <pre><code class="hljs css language-scala"><span class="hljs-keyword">val</span> builder = <span class="hljs-type">Builder</span>.newBuilder |
| .newSource(() => <span class="hljs-type">Random</span>.nextInt(<span class="hljs-number">10</span>)) |
| .filter(i => i % <span class="hljs-number">2</span> == <span class="hljs-number">0</span>) |
| .consume(i => println(<span class="hljs-string">s"Even number found: <span class="hljs-subst">$i</span>"</span>)) |
| </code></pre> |
| </span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/0.20.4-incubating/topology-development-topology-api-python"><span class="arrow-prev">← </span><span>The Heron Topology API for Python</span></a><a class="docs-next button" href="/docs/0.20.4-incubating/client-api-docs-overview"><span>Client API Docs</span><span class="arrow-next"> →</span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#getting-started">Getting started</a><ul class="toc-headings"><li><a href="#maven-setup">Maven setup</a></li></ul></li><li><a href="#streamlet-api-topology-configuration">Streamlet API topology configuration</a><ul class="toc-headings"><li><a href="#delivery-semantics">Delivery semantics</a></li></ul></li><li><a href="#streamlets">Streamlets</a></li><li><a href="#operations">Operations</a><ul class="toc-headings"><li><a href="#map-operations">Map operations</a></li><li><a href="#flatmap-operations">FlatMap operations</a></li><li><a href="#filter-operations">Filter operations</a></li><li><a href="#union-operations">Union operations</a></li><li><a href="#clone-operations">Clone operations</a></li><li><a href="#transform-operations">Transform operations</a></li><li><a href="#join-operations">Join operations</a></li><li><a href="#key-by-operations">Key by operations</a></li><li><a href="#reduce-by-key-operations">Reduce by key operations</a></li><li><a href="#reduce-by-key-and-window-operations">Reduce by key and window operations</a></li><li><a href="#count-by-key-operations">Count by key operations</a></li><li><a href="#count-by-key-and-window-operations">Count by key and window operations</a></li><li><a href="#split-operations">Split operations</a></li><li><a href="#with-stream-operations">With stream operations</a></li><li><a href="#apply-operator-operations">Apply operator operations</a></li><li><a href="#repartition-operations">Repartition operations</a></li><li><a href="#sink-operations">Sink operations</a></li><li><a href="#log-operations">Log operations</a></li><li><a href="#consume-operations">Consume operations</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><div class="apache-disclaimer">Apache Heron is an effort undergoing incubation at <a target="_blank" href="https://apache.org/">The Apache Software Foundation (ASF)</a> sponsored by the Apache Incubator PMC. Incubation is required of all newly accepted projects until a further review indicates that the infrastructure, communications, and decision making process have stabilized in a manner consistent with other successful ASF projects. While incubation status is not necessarily a reflection of the completeness or stability of the code, it does indicate that the project has yet to be fully endorsed by the ASF.<br/><br/>Apache®, the names of Apache projects, and the feather logo are either <a rel="external" href="https://www.apache.org/foundation/marks/list/">registered trademarks or trademarks</a> of the Apache Software Foundation in the United States and/or other countries.<br/><br/><div class="copyright-box">Copyright © 2022 the Apache Software Foundation, Apache Heron, Heron, |
| Apache, the Apache feather Logo, and the Apache Heron project logo are either registered |
| trademarks or trademarks of the Apache Software Foundation.</div></div><div class="apache-links"><a class="item" rel="external" href="https://incubator.apache.org/">Apache Incubator</a><div><a class="item" rel="external" href="https://www.apache.org/">About the ASF</a></div><div><a class="item" rel="external" href="https://www.apache.org/events/current-event">Events</a></div><div><a class="item" rel="external" href="https://www.apache.org/foundation/thanks.html">Thanks</a></div><div><a class="item" rel="external" href="https://www.apache.org/foundation/sponsorship.html">Become a Sponsor</a></div><div><a class="item" rel="external" href="https://www.apache.org/security/">Security</a></div><div><a class="item" rel="external" href="https://www.apache.org/licenses/">License</a></div></div></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html> |