blob: 3e5dd2e26531acace73027a32a243fd9942f9709 [file] [log] [blame]
<!DOCTYPE html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="description" content="">
<meta name="keywords" content=" ">
<title>How can I run analytics on several tuples in parallel? | Apache Edgent Documentation</title>
<link rel="stylesheet" type="text/css" href="../css/syntax.css">
<link rel="stylesheet" type="text/css" href="../css/font-awesome.min.css">
<!--<link rel="stylesheet" type="text/css" href="../css/bootstrap.min.css">-->
<link rel="stylesheet" type="text/css" href="../css/modern-business.css">
<link rel="stylesheet" type="text/css" href="../css/lavish-bootstrap.css">
<link rel="stylesheet" type="text/css" href="../css/customstyles.css">
<link rel="stylesheet" type="text/css" href="../css/theme-blue.css">
<script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/2.1.4/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/jquery-cookie/1.4.1/jquery.cookie.min.js"></script>
<script src="../js/jquery.navgoco.min.js"></script>
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/anchor-js/2.0.0/anchor.min.js"></script>
<script src="../js/toc.js"></script>
<script src="../js/customscripts.js"></script>
<link rel="shortcut icon" href="../common_images/favicon.ico" type="image/x-icon">
<!-- HTML5 Shim and Respond.js IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/libs/html5shiv/3.7.0/html5shiv.js"></script>
<script src="https://oss.maxcdn.com/libs/respond.js/1.4.2/respond.min.js"></script>
<![endif]-->
<script>
$(function () {
$('[data-toggle="tooltip"]').tooltip()
})
</script>
</head>
<body>
<!-- Navigation -->
<nav class="navbar navbar-inverse navbar-fixed-top" role="navigation">
<div class="container topnavlinks">
<div class="navbar-header">
<button type="button" class="navbar-toggle" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
<span class="sr-only">Toggle navigation</span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<a class="fa fa-home fa-lg navbar-brand" href="../docs/home.html">&nbsp;<span class="projectTitle"> Apache Edgent Documentation</span></a>
</div>
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav navbar-right">
<!-- entries without drop-downs appear here -->
<!-- conditional logic to control which topnav appears for the audience defined in the configuration file.-->
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">GitHub Repos<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="https://github.com/apache/incubator-edgent" target="_blank">Source code</a></li>
<li><a href="https://github.com/apache/incubator-edgent-samples" target="_blank">Edgent Samples</a></li>
<li><a href="https://github.com/apache/incubator-edgent-website" target="_blank">Website/Documentation</a></li>
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Javadoc<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="..\javadoc\latest">latest</a></li>
<li><a href="..\javadoc\r1.2.0">1.2.0</a></li>
<li><a href="..\javadoc\r1.1.0">1.1.0</a></li>
<li><a href="..\javadoc\r1.0.0">1.0.0</a></li>
<li><a href="..\javadoc\r0.4.0">0.4.0</a></li>
</ul>
</li>
<!-- entries with drop-downs appear here -->
<!-- conditional logic to control which topnav appears for the audience defined in the configuration file.-->
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Edgent Resources<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="downloads">Download</a></li>
<li><a href="faq">FAQ</a></li>
<li class="dropdownActive"><a href="/">edgent.apache.org</a></li>
</ul>
</li>
<!-- special insertion -->
<!-- Send feedback function -->
<script>
function SendLinkByMail(href) {
var subject= "Apache Edgent Documentation feedback";
var body = "I have some feedback about the How can I run analytics on several tuples in parallel? page: ";
body += window.location.href;
body += "";
var uri = "mailto:?subject=";
uri += encodeURIComponent(subject);
uri += "&body=";
uri += encodeURIComponent(body);
window.location.href = uri;
}
</script>
<li><a href="mailto:dev@edgent.incubator.apache.org" target="_blank"><i class="fa fa-envelope-o"></i> Feedback</a></li>
<!--uncomment this block if you want simple search instead of algolia-->
<li>
<!--start search-->
<div id="search-demo-container">
<input type="text" id="search-input" placeholder="search...">
<ul id="results-container"></ul>
</div>
<script src="../js/jekyll-search.js" type="text/javascript"></script>
<script type="text/javascript">
SimpleJekyllSearch.init({
searchInput: document.getElementById('search-input'),
resultsContainer: document.getElementById('results-container'),
dataSource: '../search.json',
searchResultTemplate: '<li><a href="{url}" title="How can I run analytics on several tuples in parallel?">{title}</a></li>',
noResultsText: 'No results found.',
limit: 10,
fuzzy: true,
})
</script>
<!--end search-->
</li>
</div>
<!-- /.container -->
</nav>
<!-- Page Content -->
<div class="container">
<div class="col-lg-12">&nbsp;</div>
<!-- Content Row -->
<div class="row">
<!-- Sidebar Column -->
<div class="col-md-3">
<script>
$(document).ready(function() {
// Initialize navgoco with default options
$("#mysidebar").navgoco({
caretHtml: '',
accordion: true,
openClass: 'active', // open
save: true,
cookie: {
name: 'navgoco',
expires: false,
path: '/'
},
slide: {
duration: 400,
easing: 'swing'
}
});
$("#collapseAll").click(function(e) {
e.preventDefault();
$("#mysidebar").navgoco('toggle', false);
});
$("#expandAll").click(function(e) {
e.preventDefault();
$("#mysidebar").navgoco('toggle', true);
});
});
</script>
<ul id="mysidebar" class="nav">
<span class="siteTagline">Edgent</span>
<span class="versionTagline">Version 1.2.0-incubating</span>
<li><a href="#">Overview</a>
<ul>
<li><a href="../docs/edgent_index.html">Introduction</a></li>
<li><a href="../docs/home.html">Edgent Overview</a></li>
<li><a href="../docs/power-of-edgent.html">The Power of Edgent</a></li>
<li><a href="../docs/faq.html">FAQ</a></li>
</ul>
<li><a href="#">Get Started</a>
<ul>
<li><a href="../docs/downloads.html">Downloads</a></li>
<li><a href="../docs/edgent-getting-started.html">Getting Started Guide</a></li>
<li><a href="../docs/edgent-getting-started-samples.html">Quickstart with Edgent Samples</a></li>
<li><a href="../docs/application-development.html">Understanding App Development</a></li>
<li><a href="../docs/quickstart.html">Quickstart IBM Watson IoT Platform</a></li>
<li><a href="../docs/streaming-concepts.html">Streaming concepts</a></li>
<li><a href="../docs/common-edgent-operations.html">Common operations</a></li>
</ul>
<li><a href="#">Edgent Cookbook</a>
<ul>
<li><a href="../recipes/recipe_hello_edgent.html">Hello Edgent!</a></li>
<li><a href="../recipes/recipe_source_function.html">Writing a source function</a></li>
<li><a href="../recipes/recipe_value_out_of_range.html">Detecting a sensor value out of expected range</a></li>
<li><a href="../recipes/recipe_different_processing_against_stream.html">Applying different processing against a single stream</a></li>
<li><a href="../recipes/recipe_combining_streams_processing_results.html">Splitting a stream to apply different processing and combining the results into a single stream</a></li>
<li><a href="../recipes/recipe_external_filter_range.html">Using an external configuration file for filter ranges</a></li>
<li><a href="../recipes/recipe_adaptable_filter_range.html">Changing a filter's range</a></li>
<li><a href="../recipes/recipe_adaptable_polling_source.html">Changing a polled source stream's period</a></li>
<li><a href="../recipes/recipe_adaptable_deadtime_filter.html">Using an adaptable deadtime filter</a></li>
<li><a href="../recipes/recipe_dynamic_analytic_control.html">Dynamically enabling analytic flows</a></li>
<li class="active"><a href="../recipes/recipe_parallel_analytics.html">How can I run analytics on several tuples in parallel?</a></li>
<li><a href="../recipes/recipe_concurrent_analytics.html">How can I run several analytics on a tuple concurrently?</a></li>
<li><a href="../recipes/recipe_writing_a_connector.html">How do I write a connector?</a></li>
</ul>
<li><a href="#">Using the Console</a>
<ul>
<li><a href="../docs/console.html">Using the console</a></li>
</ul>
<li><a href="#">Get Involved</a>
<ul>
<li><a href="../docs/community.html">How to participate</a></li>
<li><a href="../docs/committers.html">Committers</a></li>
</ul>
<!-- if you aren't using the accordion, uncomment this block:
<p class="external">
<a href="#" id="collapseAll">Collapse All</a> | <a href="#" id="expandAll">Expand All</a>
</p>
-->
<br/>
</li>
</ul>
<div class="row">
<div class="col-md-12">
<!-- this handles the automatic toc. use ## for subheads to auto-generate the on-page minitoc. if you use html tags, you must supply an ID for the heading element in order for it to appear in the minitoc. -->
<script>
$( document ).ready(function() {
// Handler for .ready() called.
$('#toc').toc({ minimumHeaders: 0, listType: 'ul', showSpeed: 0, headers: 'h2,h3,h4' });
/* this offset helps account for the space taken up by the floating toolbar. */
$('#toc').on('click', 'a', function() {
var target = $(this.getAttribute('href'))
, scroll_target = target.offset().top
$(window).scrollTop(scroll_target - 10);
return false
})
});
</script>
<div id="toc"></div>
</div>
</div>
</div>
<!-- this highlights the active parent class in the navgoco sidebar. this is critical so that the parent expands when you're viewing a page. This must appear below the sidebar code above. Otherwise, if placed inside customscripts.js, the script runs before the sidebar code runs and the class never gets inserted.-->
<script>$("li.active").parents('li').toggleClass("active");</script>
<!-- Content Column -->
<div class="col-md-9">
<div class="post-header">
<h1 class="post-title-main">How can I run analytics on several tuples in parallel?</h1>
</div>
<div class="post-content">
<!-- this handles the automatic toc. use ## for subheads to auto-generate the on-page minitoc. if you use html tags, you must supply an ID for the heading element in order for it to appear in the minitoc. -->
<script>
$( document ).ready(function() {
// Handler for .ready() called.
$('#toc').toc({ minimumHeaders: 0, listType: 'ul', showSpeed: 0, headers: 'h2,h3,h4' });
/* this offset helps account for the space taken up by the floating toolbar. */
$('#toc').on('click', 'a', function() {
var target = $(this.getAttribute('href'))
, scroll_target = target.offset().top
$(window).scrollTop(scroll_target - 10);
return false
})
});
</script>
<div id="toc"></div>
<a target="_blank" href="https://github.com/apache/incubator-edgent-website/blob/master/site/recipes/recipe_parallel_analytics.md" class="btn btn-default githubEditButton" role="button"><i class="fa fa-github fa-lg"></i> Edit me</a>
<p>If the duration of your per-tuple analytic processing makes your application unable to keep up with the tuple ingest rate or result generation rate, you can often run analytics on several tuples in parallel to improve performance.</p>
<p>The overall proessing time for a single tuple is still the same but the processing for each tuple is overlapped. In the extreme your application may be able to process N tuples in the same time that it would have processed one.</p>
<p>This usage model is in contrast to what&#39;s been called <em>concurrent analytics</em>, where multiple different independent analytics for a single tuple are performed concurrently, as when using <code>PlumbingStreams.concurrent()</code>.</p>
<p>e.g., imagine your analytic pipeline has three stages to it: A1, A2, A3, and that A2 dominates the processing time. You want to change the serial processing flow graph from:</p>
<div class="highlight"><pre><code class="language-" data-lang="">sensorReadings&lt;T&gt; -&gt; A1 -&gt; A2 -&gt; A3 -&gt; results&lt;R&gt;
</code></pre></div>
<p>to a flow where the A2 analytics run on several tuples in parallel in a flow like:</p>
<div class="highlight"><pre><code class="language-" data-lang=""> |-&gt; A2-channel0 -&gt;|
sensorReadings&lt;T&gt; -&gt; A1 -&gt; |-&gt; A2-channel1 -&gt;| -&gt; A3 -&gt; results&lt;R&gt;
|-&gt; A2-channel2 -&gt;|
|-&gt; A2-channel3 -&gt;|
|-&gt; A2-channel4 -&gt;|
...
</code></pre></div>
<p>The key to the above flow is to use a <em>splitter</em> to distribute the tuples among the parallel channels. Each of the parallel channels also needs a thread to run its analytic pipeline.</p>
<p><code>PlumbingStreams.parallel()</code> builds a parallel flow graph for you. Alternatively, you can use <code>TStream.split()</code>, <code>PlumbingStreams.isolate()</code>, and <code>TStream.union()</code> and build a parallel flow graph yourself.</p>
<p>More specifically <code>parallel()</code> generates a flow like:</p>
<div class="highlight"><pre><code class="language-" data-lang=""> |-&gt; isolate(10) -&gt; pipeline-ch0 -&gt; |
stream -&gt; split(width,splitter) -&gt; |-&gt; isolate(10) -&gt; pipeline-ch1 -&gt; |-&gt; union -&gt; isolate(width)
|-&gt; isolate(10) -&gt; pipeline-ch2 -&gt; |
...
</code></pre></div>
<p>It&#39;s easy to use <code>parallel()</code>!</p>
<h2 id="define-the-splitter">Define the splitter</h2>
<p>The splitter function partitions the tuples among the parallel channels. <code>PlumbingStreams.roundRobinSplitter()</code> is a commonly used splitter that simply cycles among each channel in succession. The round robin strategy works great when the processing time of tuples is uniform. Other splitter functions may use information in the tuple to decide how to partition them.</p>
<p>This recipe just uses the round robin splitter for a <code>TStream&lt;Double&gt;</code>.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kt">int</span> <span class="n">width</span> <span class="o">=</span> <span class="mi">5</span><span class="o">;</span> <span class="c1">// number of parallel channels</span>
<span class="n">ToIntFunction</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">splitter</span> <span class="o">=</span> <span class="n">PlumbingStreams</span><span class="o">.</span><span class="na">roundRobinSplitter</span><span class="o">(</span><span class="n">width</span><span class="o">);</span>
</code></pre></div>
<p>Another possibility is to use a &quot;load balanced splitter&quot; configuration. That is covered below.</p>
<h2 id="define-the-pipeline-to-run-in-parallel">Define the pipeline to run in parallel</h2>
<p>Define a <code>BiFunction&lt;TStream&lt;T&gt;, Integer, TStream&lt;R&gt;&gt;</code> that builds the pipeline. That is, define a function that receives a <code>TStream&lt;T&gt;</code> and an integer <code>channel</code> and creates a pipeline for that channel that returns a <code>TStream&lt;R&gt;</code>.</p>
<p>Many pipelines don&#39;t care what channel they&#39;re being constructed for. While the pipeline function typically yields the same pipeline processing for each channel there is no requirement for it to do so.</p>
<p>In this simple recipe the pipeline receives a <code>TStream&lt;Double&gt;</code> as input and generates a <code>TStream&lt;String&gt;</code> as output.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">static</span> <span class="n">BiFunction</span><span class="o">&lt;</span><span class="n">TStream</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">TStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;</span> <span class="nf">pipeline</span><span class="o">()</span> <span class="o">{</span>
<span class="c1">// a simple 4 stage pipeline simulating some amount of work by sleeping</span>
<span class="k">return</span> <span class="o">(</span><span class="n">stream</span><span class="o">,</span> <span class="n">channel</span><span class="o">)</span> <span class="o">-&gt;</span>
<span class="o">{</span>
<span class="n">String</span> <span class="n">tagPrefix</span> <span class="o">=</span> <span class="s">"pipeline-ch"</span><span class="o">+</span><span class="n">channel</span><span class="o">;</span>
<span class="k">return</span> <span class="n">stream</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">tuple</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="n">sleep</span><span class="o">(</span><span class="mi">1000</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">MILLISECONDS</span><span class="o">);</span>
<span class="k">return</span> <span class="s">"This is the "</span><span class="o">+</span><span class="n">tagPrefix</span><span class="o">+</span><span class="s">" result for tuple "</span><span class="o">+</span><span class="n">tuple</span><span class="o">;</span>
<span class="o">}).</span><span class="na">tag</span><span class="o">(</span><span class="n">tagPrefix</span><span class="o">+</span><span class="s">".stage1"</span><span class="o">)</span>
<span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">Functions</span><span class="o">.</span><span class="na">identity</span><span class="o">()).</span><span class="na">tag</span><span class="o">(</span><span class="n">tagPrefix</span><span class="o">+</span><span class="s">".stage2"</span><span class="o">)</span>
<span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">Functions</span><span class="o">.</span><span class="na">identity</span><span class="o">()).</span><span class="na">tag</span><span class="o">(</span><span class="n">tagPrefix</span><span class="o">+</span><span class="s">".stage3"</span><span class="o">);</span>
<span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">Functions</span><span class="o">.</span><span class="na">identity</span><span class="o">()).</span><span class="na">tag</span><span class="o">(</span><span class="n">tagPrefix</span><span class="o">+</span><span class="s">".stage4"</span><span class="o">);</span>
<span class="o">};</span>
<span class="o">}</span>
</code></pre></div>
<h2 id="build-the-parallel-flow">Build the parallel flow</h2>
<p>Given a width, splitter and pipeline function it just takes a single call:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">TStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">results</span> <span class="o">=</span> <span class="n">PlumbingStreams</span><span class="o">.</span><span class="na">parallel</span><span class="o">(</span><span class="n">readings</span><span class="o">,</span> <span class="n">width</span><span class="o">,</span> <span class="n">splitter</span><span class="o">,</span> <span class="n">pipeline</span><span class="o">());</span>
</code></pre></div>
<h2 id="load-balanced-parallel-flow">Load balanced parallel flow</h2>
<p>A load balanced parallel flow allocates an incoming tuple to the first available parallel channel. When tuple processing times are variable, using a load balanced parallel flow can result in greater overall throughput.</p>
<p>To create a load balanced parallel flow simply use the <code>parallelBalanced()</code> method instead of <code>parallel()</code>. Everything is the same except you don&#39;t supply a splitter: </p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">TStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">results</span> <span class="o">=</span> <span class="n">PlumbingStreams</span><span class="o">.</span><span class="na">parallelBalanced</span><span class="o">(</span><span class="n">readings</span><span class="o">,</span> <span class="n">width</span><span class="o">,</span> <span class="n">pipeline</span><span class="o">());</span>
</code></pre></div>
<h2 id="the-final-application">The final application</h2>
<p>When the application is run it prints out 5 (width) tuples every second. Without the parallel channels, it would only print one tuple each second.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kn">package</span> <span class="n">edgent</span><span class="o">.</span><span class="na">samples</span><span class="o">.</span><span class="na">topology</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">java.util.Date</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.console.server.HttpServer</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.function.BiFunction</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.function.Functions</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.providers.development.DevelopmentProvider</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.providers.direct.DirectProvider</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.samples.utils.sensor.SimpleSimulatedSensor</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.topology.TStream</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.topology.Topology</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.topology.plumbing.PlumbingStreams</span><span class="o">;</span>
<span class="cm">/**
* A recipe for parallel analytics.
*/</span>
<span class="kd">public</span> <span class="kd">class</span> <span class="nc">ParallelRecipe</span> <span class="o">{</span>
<span class="cm">/**
* Process several tuples in parallel in a replicated pipeline.
*/</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">DirectProvider</span> <span class="n">dp</span> <span class="o">=</span> <span class="k">new</span> <span class="n">DevelopmentProvider</span><span class="o">();</span>
<span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"development console url: "</span>
<span class="o">+</span> <span class="n">dp</span><span class="o">.</span><span class="na">getServices</span><span class="o">().</span><span class="na">getService</span><span class="o">(</span><span class="n">HttpServer</span><span class="o">.</span><span class="na">class</span><span class="o">).</span><span class="na">getConsoleUrl</span><span class="o">());</span>
<span class="n">Topology</span> <span class="n">top</span> <span class="o">=</span> <span class="n">dp</span><span class="o">.</span><span class="na">newTopology</span><span class="o">(</span><span class="s">"ParallelRecipe"</span><span class="o">);</span>
<span class="c1">// The number of parallel processing channels to generate</span>
<span class="kt">int</span> <span class="n">width</span> <span class="o">=</span> <span class="mi">5</span><span class="o">;</span>
<span class="c1">// Define the splitter</span>
<span class="n">ToIntFunction</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">splitter</span> <span class="o">=</span> <span class="n">PlumbingStreams</span><span class="o">.</span><span class="na">roundRobinSplitter</span><span class="o">(</span><span class="n">width</span><span class="o">);</span>
<span class="c1">// Generate a polled simulated sensor stream</span>
<span class="n">SimpleSimulatedSensor</span> <span class="n">sensor</span> <span class="o">=</span> <span class="k">new</span> <span class="n">SimpleSimulatedSensor</span><span class="o">();</span>
<span class="n">TStream</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">readings</span> <span class="o">=</span> <span class="n">top</span><span class="o">.</span><span class="na">poll</span><span class="o">(</span><span class="n">sensor</span><span class="o">,</span> <span class="mi">10</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">MILLISECONDS</span><span class="o">)</span>
<span class="o">.</span><span class="na">tag</span><span class="o">(</span><span class="s">"readings"</span><span class="o">);</span>
<span class="c1">// Build the parallel analytic pipelines flow</span>
<span class="n">TStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">results</span> <span class="o">=</span>
<span class="n">PlumbingStreams</span><span class="o">.</span><span class="na">parallel</span><span class="o">(</span><span class="n">readings</span><span class="o">,</span> <span class="n">width</span><span class="o">,</span> <span class="n">splitter</span><span class="o">,</span> <span class="n">pipeline</span><span class="o">())</span>
<span class="o">.</span><span class="na">tag</span><span class="o">(</span><span class="s">"results"</span><span class="o">);</span>
<span class="c1">// Print out the results.</span>
<span class="n">results</span><span class="o">.</span><span class="na">sink</span><span class="o">(</span><span class="n">tuple</span> <span class="o">-&gt;</span> <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="k">new</span> <span class="n">Date</span><span class="o">().</span><span class="na">toString</span><span class="o">()</span> <span class="o">+</span> <span class="s">" "</span> <span class="o">+</span> <span class="n">tuple</span><span class="o">));</span>
<span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Notice that "</span><span class="o">+</span><span class="n">width</span><span class="o">+</span><span class="s">" results are generated every second - one from each parallel channel."</span>
<span class="o">+</span> <span class="s">"\nOnly one result would be generated each second if performed serially."</span><span class="o">);</span>
<span class="n">dp</span><span class="o">.</span><span class="na">submit</span><span class="o">(</span><span class="n">top</span><span class="o">);</span>
<span class="o">}</span>
<span class="cm">/** Function to create analytic pipeline and add it to a stream */</span>
<span class="kd">private</span> <span class="kd">static</span> <span class="n">BiFunction</span><span class="o">&lt;</span><span class="n">TStream</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;,</span><span class="n">Integer</span><span class="o">,</span><span class="n">TStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;</span> <span class="nf">pipeline</span><span class="o">()</span> <span class="o">{</span>
<span class="c1">// a simple 3 stage pipeline simulating some amount of work by sleeping</span>
<span class="k">return</span> <span class="o">(</span><span class="n">stream</span><span class="o">,</span> <span class="n">channel</span><span class="o">)</span> <span class="o">-&gt;</span>
<span class="o">{</span>
<span class="n">String</span> <span class="n">tagPrefix</span> <span class="o">=</span> <span class="s">"pipeline-ch"</span><span class="o">+</span><span class="n">channel</span><span class="o">;</span>
<span class="k">return</span> <span class="n">stream</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">tuple</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="n">sleep</span><span class="o">(</span><span class="mi">1000</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">MILLISECONDS</span><span class="o">);</span>
<span class="k">return</span> <span class="s">"This is the "</span><span class="o">+</span><span class="n">tagPrefix</span><span class="o">+</span><span class="s">" result for tuple "</span><span class="o">+</span><span class="n">tuple</span><span class="o">;</span>
<span class="o">}).</span><span class="na">tag</span><span class="o">(</span><span class="n">tagPrefix</span><span class="o">+</span><span class="s">".stage1"</span><span class="o">)</span>
<span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">Functions</span><span class="o">.</span><span class="na">identity</span><span class="o">()).</span><span class="na">tag</span><span class="o">(</span><span class="n">tagPrefix</span><span class="o">+</span><span class="s">".stage2"</span><span class="o">)</span>
<span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">Functions</span><span class="o">.</span><span class="na">identity</span><span class="o">()).</span><span class="na">tag</span><span class="o">(</span><span class="n">tagPrefix</span><span class="o">+</span><span class="s">".stage3"</span><span class="o">);</span>
<span class="o">};</span>
<span class="o">}</span>
<span class="kd">private</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">sleep</span><span class="o">(</span><span class="kt">long</span> <span class="n">period</span><span class="o">,</span> <span class="n">TimeUnit</span> <span class="n">unit</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">RuntimeException</span> <span class="o">{</span>
<span class="k">try</span> <span class="o">{</span>
<span class="n">Thread</span><span class="o">.</span><span class="na">sleep</span><span class="o">(</span><span class="n">unit</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="n">period</span><span class="o">));</span>
<span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">InterruptedException</span> <span class="n">e</span><span class="o">)</span> <span class="o">{</span>
<span class="k">throw</span> <span class="k">new</span> <span class="nf">RuntimeException</span><span class="o">(</span><span class="s">"Interrupted"</span><span class="o">,</span> <span class="n">e</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div>
<div class="tags">
</div>
<!--
<div id="disqus_thread"></div>
<script type="text/javascript">
/* * * CONFIGURATION VARIABLES: EDIT BEFORE PASTING INTO YOUR WEBPAGE * * */
var disqus_shortname = 'idrbwjekyll'; // required: replace example with your forum shortname
/* * * DON'T EDIT BELOW THIS LINE * * */
(function() {
var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
(document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
})();
</script>
<noscript>Please enable JavaScript to view the <a href="https://disqus.com/?ref_noscript">comments powered by Disqus.</a></noscript>
-->
</div>
<footer>
<div class="row">
<div class="col-lg-12 footer">
Site last
generated: Apr 3, 2019 <br/>
</div>
</div>
<br/>
<div class="row">
<div class="col-md-12">
<p class="small">Apache Edgent is an effort undergoing Incubation at The Apache Software
Foundation (ASF), sponsored by the Incubator. Incubation is required of all newly accepted projects
until a further review indicates that the infrastructure, communications, and decision making process
have stabilized in a manner consistent with other successful ASF projects. While incubation status is
not necessarily a reflection of the completeness or stability of the code, it does indicate that the
project has yet to be fully endorsed by the ASF.</p>
</div>
</div>
<div class="row">
<div class="col-md-12">
<p class="small">Copyright © 2016 The Apache Software Foundation. Licensed under the Apache
License, Version 2.0.
Apache, the Apache Feather logo, and the Apache Incubator project logo are trademarks of The Apache
Software Foundation. All other marks mentioned may be trademarks or registered trademarks of their
respective owners.</p>
</div>
</div>
<div class="container">
<div class="row">
<div>
<img class="img-responsive center-block" src="../img/edgent_incubation.png" style="display: block; margin: auto;"alt="">
</div>
</div>
</footer>
</div><!-- /.row -->
</div> <!-- /.container -->
</body>
</html>