blob: 6ffaabceb83aeb0945f8dec906909d6082dc9c24 [file] [log] [blame]
<!DOCTYPE html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="description" content="">
<meta name="keywords" content=" ">
<title>How can I run several analytics on a tuple concurrently? | Apache Quarks Documentation</title>
<link rel="stylesheet" type="text/css" href="../css/syntax.css">
<link rel="stylesheet" type="text/css" href="../css/font-awesome.min.css">
<!--<link rel="stylesheet" type="text/css" href="../css/bootstrap.min.css">-->
<link rel="stylesheet" type="text/css" href="../css/modern-business.css">
<link rel="stylesheet" type="text/css" href="../css/lavish-bootstrap.css">
<link rel="stylesheet" type="text/css" href="../css/customstyles.css">
<link rel="stylesheet" type="text/css" href="../css/theme-blue.css">
<script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/2.1.4/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/jquery-cookie/1.4.1/jquery.cookie.min.js"></script>
<script src="../js/jquery.navgoco.min.js"></script>
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/anchor-js/2.0.0/anchor.min.js"></script>
<script src="../js/toc.js"></script>
<script src="../js/customscripts.js"></script>
<link rel="shortcut icon" href="../common_images/favicon.ico" type="image/x-icon">
<!-- HTML5 Shim and Respond.js IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/libs/html5shiv/3.7.0/html5shiv.js"></script>
<script src="https://oss.maxcdn.com/libs/respond.js/1.4.2/respond.min.js"></script>
<![endif]-->
<script>
$(function () {
$('[data-toggle="tooltip"]').tooltip()
})
</script>
</head>
<body>
<!-- Navigation -->
<nav class="navbar navbar-inverse navbar-fixed-top" role="navigation">
<div class="container topnavlinks">
<div class="navbar-header">
<button type="button" class="navbar-toggle" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
<span class="sr-only">Toggle navigation</span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<a class="fa fa-home fa-lg navbar-brand" href="../docs/home.html">&nbsp;<span class="projectTitle"> Apache Quarks Documentation</span></a>
</div>
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav navbar-right">
<!-- entries without drop-downs appear here -->
<!-- conditional logic to control which topnav appears for the audience defined in the configuration file.-->
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">GitHub Repos<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="https://github.com/apache/incubator-quarks" target="_blank">Source code</a></li>
<li><a href="https://github.com/apache/incubator-quarks-website" target="_blank">Website/Documentation</a></li>
</ul>
</li>
<li><a href="https://quarks-edge.github.io/quarks/docs/javadoc/index.html" target="_blank">Javadoc</a></li>
<!-- entries with drop-downs appear here -->
<!-- conditional logic to control which topnav appears for the audience defined in the configuration file.-->
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Quarks Resources<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="https://github.com/apache/incubator-quarks/releases" target="_blank">Download</a></li>
<li><a href="samples">Samples</a></li>
<li><a href="faq">FAQ</a></li>
</ul>
</li>
<!-- special insertion -->
<!-- Send feedback function -->
<script>
function SendLinkByMail(href) {
var subject= "Apache Quarks Documentation feedback";
var body = "I have some feedback about the How can I run several analytics on a tuple concurrently? page: ";
body += window.location.href;
body += "";
var uri = "mailto:?subject=";
uri += encodeURIComponent(subject);
uri += "&body=";
uri += encodeURIComponent(body);
window.location.href = uri;
}
</script>
<li><a href="mailto:dev@quarks.incubator.apache.org" target="_blank"><i class="fa fa-envelope-o"></i> Feedback</a></li>
<!--uncomment this block if you want simple search instead of algolia-->
<li>
<!--start search-->
<div id="search-demo-container">
<input type="text" id="search-input" placeholder="search...">
<ul id="results-container"></ul>
</div>
<script src="../js/jekyll-search.js" type="text/javascript"></script>
<script type="text/javascript">
SimpleJekyllSearch.init({
searchInput: document.getElementById('search-input'),
resultsContainer: document.getElementById('results-container'),
dataSource: '../search.json',
searchResultTemplate: '<li><a href="{url}" title="How can I run several analytics on a tuple concurrently?">{title}</a></li>',
noResultsText: 'No results found.',
limit: 10,
fuzzy: true,
})
</script>
<!--end search-->
</li>
</div>
<!-- /.container -->
</nav>
<!-- Page Content -->
<div class="container">
<div class="col-lg-12">&nbsp;</div>
<!-- Content Row -->
<div class="row">
<!-- Sidebar Column -->
<div class="col-md-3">
<script>
$(document).ready(function() {
// Initialize navgoco with default options
$("#mysidebar").navgoco({
caretHtml: '',
accordion: true,
openClass: 'active', // open
save: true,
cookie: {
name: 'navgoco',
expires: false,
path: '/'
},
slide: {
duration: 400,
easing: 'swing'
}
});
$("#collapseAll").click(function(e) {
e.preventDefault();
$("#mysidebar").navgoco('toggle', false);
});
$("#expandAll").click(function(e) {
e.preventDefault();
$("#mysidebar").navgoco('toggle', true);
});
});
</script>
<ul id="mysidebar" class="nav">
<span class="siteTagline">Quarks</span>
<span class="versionTagline">Version 0.3.0</span>
<li><a href="#">Overview</a>
<ul>
<li><a href="../docs/quarks_index.html">Introduction</a></li>
<li><a href="../docs/faq.html">FAQ</a></li>
</ul>
<li><a href="#">Get Started</a>
<ul>
<li><a href="../docs/quarks-getting-started.html">Getting started guide</a></li>
<li><a href="../docs/common-quarks-operations.html">Common operations</a></li>
</ul>
<li><a href="#">Quarks Cookbook</a>
<ul>
<li><a href="../recipes/recipe_hello_quarks.html">Hello Quarks!</a></li>
<li><a href="../recipes/recipe_source_function.html">Writing a source function</a></li>
<li><a href="../recipes/recipe_value_out_of_range.html">Detecting a sensor value out of expected range</a></li>
<li><a href="../recipes/recipe_different_processing_against_stream.html">Applying different processing against a single stream</a></li>
<li><a href="../recipes/recipe_combining_streams_processing_results.html">Splitting a stream to apply different processing and combining the results into a single stream</a></li>
<li><a href="../recipes/recipe_external_filter_range.html">Using an external configuration file for filter ranges</a></li>
<li><a href="../recipes/recipe_adaptable_filter_range.html">Changing a filter's range</a></li>
<li><a href="../recipes/recipe_adaptable_polling_source.html">Changing a polled source stream's period</a></li>
<li><a href="../recipes/recipe_adaptable_deadtime_filter.html">Using an adaptable deadtime filter</a></li>
<li><a href="../recipes/recipe_dynamic_analytic_control.html">Dynamically enabling analytic flows</a></li>
<li><a href="../recipes/recipe_parallel_analytics.html">How can I run analytics on several tuples in parallel?</a></li>
<li class="active"><a href="../recipes/recipe_concurrent_analytics.html">How can I run several analytics on a tuple concurrently?</a></li>
</ul>
<li><a href="#">Sample Programs</a>
<ul>
<li><a href="../docs/samples.html">Samples</a></li>
<li><a href="../docs/quickstart.html">Quickstart IBM Watson IoT Platform</a></li>
</ul>
<li><a href="#">Using the Console</a>
<ul>
<li><a href="../docs/console.html">Using the console</a></li>
</ul>
<li><a href="#">Get Involved</a>
<ul>
<li><a href="../docs/community.html">How to participate</a></li>
<li><a href="../docs/committers.html">Committers</a></li>
</ul>
<!-- if you aren't using the accordion, uncomment this block:
<p class="external">
<a href="#" id="collapseAll">Collapse All</a> | <a href="#" id="expandAll">Expand All</a>
</p>
-->
<br/>
</li>
</ul>
<div class="row">
<div class="col-md-12">
<!-- this handles the automatic toc. use ## for subheads to auto-generate the on-page minitoc. if you use html tags, you must supply an ID for the heading element in order for it to appear in the minitoc. -->
<script>
$( document ).ready(function() {
// Handler for .ready() called.
$('#toc').toc({ minimumHeaders: 0, listType: 'ul', showSpeed: 0, headers: 'h2,h3,h4' });
/* this offset helps account for the space taken up by the floating toolbar. */
$('#toc').on('click', 'a', function() {
var target = $(this.getAttribute('href'))
, scroll_target = target.offset().top
$(window).scrollTop(scroll_target - 10);
return false
})
});
</script>
<div id="toc"></div>
</div>
</div>
</div>
<!-- this highlights the active parent class in the navgoco sidebar. this is critical so that the parent expands when you're viewing a page. This must appear below the sidebar code above. Otherwise, if placed inside customscripts.js, the script runs before the sidebar code runs and the class never gets inserted.-->
<script>$("li.active").parents('li').toggleClass("active");</script>
<!-- Content Column -->
<div class="col-md-9">
<div class="post-header">
<h1 class="post-title-main">How can I run several analytics on a tuple concurrently?</h1>
</div>
<div class="post-content">
<!-- this handles the automatic toc. use ## for subheads to auto-generate the on-page minitoc. if you use html tags, you must supply an ID for the heading element in order for it to appear in the minitoc. -->
<script>
$( document ).ready(function() {
// Handler for .ready() called.
$('#toc').toc({ minimumHeaders: 0, listType: 'ul', showSpeed: 0, headers: 'h2,h3,h4' });
/* this offset helps account for the space taken up by the floating toolbar. */
$('#toc').on('click', 'a', function() {
var target = $(this.getAttribute('href'))
, scroll_target = target.offset().top
$(window).scrollTop(scroll_target - 10);
return false
})
});
</script>
<div id="toc"></div>
<a target="_blank" href="https://github.com/apache/incubator-quarks-website/blob/master/site/recipes/recipe_concurrent_analytics.md" class="btn btn-default githubEditButton" role="button"><i class="fa fa-github fa-lg"></i> Edit me</a>
<p>If you have several independent lengthy analytics to perform on each tuple, you may determine that it would be advantageous to perform the analytics concurrently and then combine their results.</p>
<p>The overall proessing time for a single tuple is then roughly that of the slowest analytic pipeline instead of the aggregate of each analytic pipeline.</p>
<p>This usage model is in contrast to what&#39;s often referred to as <em>parallel</em> tuple processing where several tuples are processed in parallel in replicated pipeline channels.</p>
<p>e.g., for independent analytic pipelines A1, A2, and A3, you want to change the serial processing flow graph from:</p>
<div class="highlight"><pre><code class="language-" data-lang="">sensorReadings&lt;T&gt; -&gt; A1 -&gt; A2 -&gt; A3 -&gt; results&lt;R&gt;
</code></pre></div>
<p>to a flow where the analytics run concurrently in a flow like:</p>
<div class="highlight"><pre><code class="language-" data-lang=""> |-&gt; A1 -&gt;|
sensorReadings&lt;T&gt; -&gt; |-&gt; A2 -&gt;| -&gt; results&lt;R&gt;
|-&gt; A3 -&gt;|
</code></pre></div>
<p>The key to the above flow is to use a <em>barrier</em> to synchronize the results from each of the pipelines so they can be combined into a single result tuple. Each of the concurrent channels also needs a thread to run its analytic pipeline.</p>
<p><code>PlumbingStreams.concurrent()</code> builds a concurrent flow graph for you. Alternatively, you can use <code>PlumbingStreams.barrier()</code> and <code>PlumbingStreams.isolate()</code> and build a concurrent flow graph yourself.</p>
<p>More specifically <code>concurrent()</code> generates a flow like:</p>
<div class="highlight"><pre><code class="language-" data-lang=""> |-&gt; isolate(1) -&gt; pipeline1 -&gt; |
stream -&gt; |-&gt; isolate(1) -&gt; pipeline2 -&gt; |-&gt; barrier(10) -&gt; combiner
|-&gt; isolate(1) -&gt; pipeline3 -&gt; |
</code></pre></div>
<p>It&#39;s easy to use <code>concurrent()</code>!</p>
<h2 id="define-the-collection-of-analytic-pipelines-to-run">Define the collection of analytic pipelines to run</h2>
<p>For the moment assume we have defined methods to create each pipeline: <code>a1pipeline()</code>, <code>a2pipeline()</code> and <code>a3pipeline()</code>. In this simple recipe each pipeline receives a <code>TStream&lt;Double&gt;</code> as input and generates a <code>TStream&lt;String&gt;</code> as output.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">List</span><span class="o">&lt;</span><span class="n">Function</span><span class="o">&lt;</span><span class="n">TStream</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;,</span> <span class="n">TStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;&gt;</span> <span class="n">pipelines</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o">&lt;&gt;();</span>
<span class="n">pipelines</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">a1pipeline</span><span class="o">());</span>
<span class="n">pipelines</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">a2pipeline</span><span class="o">());</span>
<span class="n">pipelines</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">a3pipeline</span><span class="o">());</span>
</code></pre></div>
<h2 id="define-the-result-combiner">Define the result combiner</h2>
<p>Each pipeline creates one result tuple for each input tuple. The <code>barrier</code> collects one tuple from each pipeline and then creates a list of those tuples. The combiner is invoked with that list to generate the final aggregate result tuple.</p>
<p>In this recipe the combiner is a simple lambda function that returns the input list:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Function</span><span class="o">&lt;</span><span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">combiner</span> <span class="o">=</span> <span class="n">list</span> <span class="o">-&gt;</span> <span class="n">list</span><span class="o">;</span>
</code></pre></div>
<h2 id="build-the-concurrent-flow">Build the concurrent flow</h2>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">TStream</span><span class="o">&lt;</span><span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">results</span> <span class="o">=</span> <span class="n">PlumbingStreams</span><span class="o">.</span><span class="na">concurrent</span><span class="o">(</span><span class="n">readings</span><span class="o">,</span> <span class="n">pipelines</span><span class="o">,</span> <span class="n">combiner</span><span class="o">);</span>
</code></pre></div>
<h2 id="define-your-analytic-pipelines">Define your analytic pipelines</h2>
<p>For each analytic pipeline, define a <code>Function&lt;TStream&lt;T&gt;, TStream&lt;U&gt;&gt;</code> that will create the pipeline. That is, define a function that takes a <code>TStream&lt;T&gt;</code> as its input and yields a <code>TStream&lt;U&gt;</code> as its result. Of course, <code>U</code> can be the same type as <code>T</code>.</p>
<p>In this recipe we&#39;ll just define some very simple pipelines and use sleep to simulate some long processing times.</p>
<p>Here&#39;s the A3 pipeline builder:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">static</span> <span class="n">Function</span><span class="o">&lt;</span><span class="n">TStream</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;,</span><span class="n">TStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">a3pipeline</span><span class="o">()</span> <span class="o">{</span>
<span class="c1">// simple 3 stage pipeline simulating some amount of work by sleeping</span>
<span class="k">return</span> <span class="n">stream</span> <span class="o">-&gt;</span> <span class="n">stream</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">tuple</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="n">sleep</span><span class="o">(</span><span class="mi">800</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">MILLISECONDS</span><span class="o">);</span>
<span class="k">return</span> <span class="s">"This is the a3pipeline result for tuple "</span><span class="o">+</span><span class="n">tuple</span><span class="o">;</span>
<span class="o">}).</span><span class="na">tag</span><span class="o">(</span><span class="s">"a3.stage1"</span><span class="o">)</span>
<span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">Functions</span><span class="o">.</span><span class="na">identity</span><span class="o">()).</span><span class="na">tag</span><span class="o">(</span><span class="s">"a3.stage2"</span><span class="o">)</span>
<span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">Functions</span><span class="o">.</span><span class="na">identity</span><span class="o">()).</span><span class="na">tag</span><span class="o">(</span><span class="s">"a3.stage3"</span><span class="o">);</span>
<span class="o">}</span>
</code></pre></div>
<h2 id="the-final-application">The final application</h2>
<p>When the application is run it prints out an aggregate result (a list of one tuple from each pipeline) every second. If the three pipelines were run serially, it would take on the order of 2.4 seconds to generate each aggregate result.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kn">package</span> <span class="n">quarks</span><span class="o">.</span><span class="na">samples</span><span class="o">.</span><span class="na">topology</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">java.util.ArrayList</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">java.util.Date</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">java.util.List</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">quarks.console.server.HttpServer</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">quarks.function.Function</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">quarks.function.Functions</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">quarks.providers.development.DevelopmentProvider</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">quarks.providers.direct.DirectProvider</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">quarks.samples.utils.sensor.SimpleSimulatedSensor</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">quarks.topology.TStream</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">quarks.topology.Topology</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">quarks.topology.plumbing.PlumbingStreams</span><span class="o">;</span>
<span class="cm">/**
* A recipe for concurrent analytics.
*/</span>
<span class="kd">public</span> <span class="kd">class</span> <span class="nc">ConcurrentRecipe</span> <span class="o">{</span>
<span class="cm">/**
* Concurrently run a collection of long running independent
* analytic pipelines on each tuple.
*/</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="n">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">DirectProvider</span> <span class="n">dp</span> <span class="o">=</span> <span class="k">new</span> <span class="n">DevelopmentProvider</span><span class="o">();</span>
<span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"development console url: "</span>
<span class="o">+</span> <span class="n">dp</span><span class="o">.</span><span class="na">getServices</span><span class="o">().</span><span class="na">getService</span><span class="o">(</span><span class="n">HttpServer</span><span class="o">.</span><span class="na">class</span><span class="o">).</span><span class="na">getConsoleUrl</span><span class="o">());</span>
<span class="n">Topology</span> <span class="n">top</span> <span class="o">=</span> <span class="n">dp</span><span class="o">.</span><span class="na">newTopology</span><span class="o">(</span><span class="s">"ConcurrentRecipe"</span><span class="o">);</span>
<span class="c1">// Define the list of independent unique analytic pipelines to include</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">Function</span><span class="o">&lt;</span><span class="n">TStream</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;,</span><span class="n">TStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;&gt;</span> <span class="n">pipelines</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o">&lt;&gt;();</span>
<span class="n">pipelines</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">a1pipeline</span><span class="o">());</span>
<span class="n">pipelines</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">a2pipeline</span><span class="o">());</span>
<span class="n">pipelines</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">a3pipeline</span><span class="o">());</span>
<span class="c1">// Define the result combiner function. The combiner receives </span>
<span class="c1">// a tuple containing a list of tuples, one from each pipeline, </span>
<span class="c1">// and returns a result tuple of any type from them.</span>
<span class="c1">// In this recipe we'll just return the list.</span>
<span class="n">Function</span><span class="o">&lt;</span><span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span><span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">combiner</span> <span class="o">=</span> <span class="n">list</span> <span class="o">-&gt;</span> <span class="n">list</span><span class="o">;</span>
<span class="c1">// Generate a polled simulated sensor stream</span>
<span class="n">SimpleSimulatedSensor</span> <span class="n">sensor</span> <span class="o">=</span> <span class="k">new</span> <span class="n">SimpleSimulatedSensor</span><span class="o">();</span>
<span class="n">TStream</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">readings</span> <span class="o">=</span> <span class="n">top</span><span class="o">.</span><span class="na">poll</span><span class="o">(</span><span class="n">sensor</span><span class="o">,</span> <span class="mi">1</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">)</span>
<span class="o">.</span><span class="na">tag</span><span class="o">(</span><span class="s">"readings"</span><span class="o">);</span>
<span class="c1">// Build the concurrent analytic pipeline flow</span>
<span class="n">TStream</span><span class="o">&lt;</span><span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">results</span> <span class="o">=</span>
<span class="n">PlumbingStreams</span><span class="o">.</span><span class="na">concurrent</span><span class="o">(</span><span class="n">readings</span><span class="o">,</span> <span class="n">pipelines</span><span class="o">,</span> <span class="n">combiner</span><span class="o">)</span>
<span class="o">.</span><span class="na">tag</span><span class="o">(</span><span class="s">"results"</span><span class="o">);</span>
<span class="c1">// Print out the results.</span>
<span class="n">results</span><span class="o">.</span><span class="na">sink</span><span class="o">(</span><span class="n">list</span> <span class="o">-&gt;</span> <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="k">new</span> <span class="n">Date</span><span class="o">().</span><span class="na">toString</span><span class="o">()</span> <span class="o">+</span> <span class="s">" results tuple: "</span> <span class="o">+</span> <span class="n">list</span><span class="o">));</span>
<span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Notice how an aggregate result is generated every second."</span>
<span class="o">+</span> <span class="s">"\nEach aggregate result would take 2.4sec if performed serially."</span><span class="o">);</span>
<span class="n">dp</span><span class="o">.</span><span class="na">submit</span><span class="o">(</span><span class="n">top</span><span class="o">);</span>
<span class="o">}</span>
<span class="cm">/** Function to create analytic pipeline a1 and add it to a stream */</span>
<span class="kd">private</span> <span class="kd">static</span> <span class="n">Function</span><span class="o">&lt;</span><span class="n">TStream</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;,</span><span class="n">TStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">a1pipeline</span><span class="o">()</span> <span class="o">{</span>
<span class="c1">// a simple 1 stage pipeline simulating some amount of work by sleeping</span>
<span class="k">return</span> <span class="n">stream</span> <span class="o">-&gt;</span> <span class="n">stream</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">tuple</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="n">sleep</span><span class="o">(</span><span class="mi">800</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">MILLISECONDS</span><span class="o">);</span>
<span class="k">return</span> <span class="s">"This is the a1pipeline result for tuple "</span><span class="o">+</span><span class="n">tuple</span><span class="o">;</span>
<span class="o">}).</span><span class="na">tag</span><span class="o">(</span><span class="s">"a1.stage1"</span><span class="o">);</span>
<span class="o">}</span>
<span class="cm">/** Function to create analytic pipeline a2 and add it to a stream */</span>
<span class="kd">private</span> <span class="kd">static</span> <span class="n">Function</span><span class="o">&lt;</span><span class="n">TStream</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;,</span><span class="n">TStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">a2pipeline</span><span class="o">()</span> <span class="o">{</span>
<span class="c1">// a simple 2 stage pipeline simulating some amount of work by sleeping</span>
<span class="k">return</span> <span class="n">stream</span> <span class="o">-&gt;</span> <span class="n">stream</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">tuple</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="n">sleep</span><span class="o">(</span><span class="mi">800</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">MILLISECONDS</span><span class="o">);</span>
<span class="k">return</span> <span class="s">"This is the a2pipeline result for tuple "</span><span class="o">+</span><span class="n">tuple</span><span class="o">;</span>
<span class="o">}).</span><span class="na">tag</span><span class="o">(</span><span class="s">"a2.stage1"</span><span class="o">)</span>
<span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">Functions</span><span class="o">.</span><span class="na">identity</span><span class="o">()).</span><span class="na">tag</span><span class="o">(</span><span class="s">"a2.stage2"</span><span class="o">);</span>
<span class="o">}</span>
<span class="cm">/** Function to create analytic pipeline a3 and add it to a stream */</span>
<span class="kd">private</span> <span class="kd">static</span> <span class="n">Function</span><span class="o">&lt;</span><span class="n">TStream</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;,</span><span class="n">TStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">a3pipeline</span><span class="o">()</span> <span class="o">{</span>
<span class="c1">// a simple 3 stage pipeline simulating some amount of work by sleeping</span>
<span class="k">return</span> <span class="n">stream</span> <span class="o">-&gt;</span> <span class="n">stream</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">tuple</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="n">sleep</span><span class="o">(</span><span class="mi">800</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">MILLISECONDS</span><span class="o">);</span>
<span class="k">return</span> <span class="s">"This is the a3pipeline result for tuple "</span><span class="o">+</span><span class="n">tuple</span><span class="o">;</span>
<span class="o">}).</span><span class="na">tag</span><span class="o">(</span><span class="s">"a3.stage1"</span><span class="o">)</span>
<span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">Functions</span><span class="o">.</span><span class="na">identity</span><span class="o">()).</span><span class="na">tag</span><span class="o">(</span><span class="s">"a3.stage2"</span><span class="o">)</span>
<span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">Functions</span><span class="o">.</span><span class="na">identity</span><span class="o">()).</span><span class="na">tag</span><span class="o">(</span><span class="s">"a3.stage3"</span><span class="o">);</span>
<span class="o">}</span>
<span class="kd">private</span> <span class="kd">static</span> <span class="kt">void</span> <span class="n">sleep</span><span class="o">(</span><span class="kt">long</span> <span class="n">period</span><span class="o">,</span> <span class="n">TimeUnit</span> <span class="n">unit</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">RuntimeException</span> <span class="o">{</span>
<span class="k">try</span> <span class="o">{</span>
<span class="n">Thread</span><span class="o">.</span><span class="na">sleep</span><span class="o">(</span><span class="n">unit</span><span class="o">.</span><span class="na">toMillis</span><span class="o">(</span><span class="n">period</span><span class="o">));</span>
<span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">InterruptedException</span> <span class="n">e</span><span class="o">)</span> <span class="o">{</span>
<span class="k">throw</span> <span class="k">new</span> <span class="n">RuntimeException</span><span class="o">(</span><span class="s">"Interrupted"</span><span class="o">,</span> <span class="n">e</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div>
<div class="tags">
</div>
<!--
<div id="disqus_thread"></div>
<script type="text/javascript">
/* * * CONFIGURATION VARIABLES: EDIT BEFORE PASTING INTO YOUR WEBPAGE * * */
var disqus_shortname = 'idrbwjekyll'; // required: replace example with your forum shortname
/* * * DON'T EDIT BELOW THIS LINE * * */
(function() {
var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
(document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
})();
</script>
<noscript>Please enable JavaScript to view the <a href="https://disqus.com/?ref_noscript">comments powered by Disqus.</a></noscript>
-->
</div>
<footer>
<div class="row">
<div class="col-lg-12 footer">
Site last
generated: May 9, 2016 <br/>
</div>
</div>
<br/>
<div class="row">
<div class="col-md-12">
<p class="small">Apache Quarks is an effort undergoing Incubation at The Apache Software
Foundation (ASF), sponsored by the Incubator. Incubation is required of all newly accepted projects
until a further review indicates that the infrastructure, communications, and decision making process
have stabilized in a manner consistent with other successful ASF projects. While incubation status is
not necessarily a reflection of the completeness or stability of the code, it does indicate that the
project has yet to be fully endorsed by the ASF.</p>
</div>
</div>
<div class="row">
<div class="col-md-12">
<p class="small">Copyright © 2016 The Apache Software Foundation. Licensed under the Apache
License, Version 2.0.
Apache, the Apache Feather logo, and the Apache Incubator project logo are trademarks of The Apache
Software Foundation. All other marks mentioned may be trademarks or registered trademarks of their
respective owners.</p>
</div>
</div>
</footer>
</div><!-- /.row -->
</div> <!-- /.container -->
</body>
</html>