blob: 7f0d6beb2cd52691c888e0f65a314c51350b3d12 [file] [log] [blame]
<!DOCTYPE html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="description" content="">
<meta name="keywords" content=" ">
<title>Applying different processing against a single stream | Apache Edgent Documentation</title>
<link rel="stylesheet" type="text/css" href="../css/syntax.css">
<link rel="stylesheet" type="text/css" href="../css/font-awesome.min.css">
<!--<link rel="stylesheet" type="text/css" href="../css/bootstrap.min.css">-->
<link rel="stylesheet" type="text/css" href="../css/modern-business.css">
<link rel="stylesheet" type="text/css" href="../css/lavish-bootstrap.css">
<link rel="stylesheet" type="text/css" href="../css/customstyles.css">
<link rel="stylesheet" type="text/css" href="../css/theme-blue.css">
<script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/2.1.4/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/jquery-cookie/1.4.1/jquery.cookie.min.js"></script>
<script src="../js/jquery.navgoco.min.js"></script>
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/anchor-js/2.0.0/anchor.min.js"></script>
<script src="../js/toc.js"></script>
<script src="../js/customscripts.js"></script>
<link rel="shortcut icon" href="../common_images/favicon.ico" type="image/x-icon">
<!-- HTML5 Shim and Respond.js IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/libs/html5shiv/3.7.0/html5shiv.js"></script>
<script src="https://oss.maxcdn.com/libs/respond.js/1.4.2/respond.min.js"></script>
<![endif]-->
<script>
$(function () {
$('[data-toggle="tooltip"]').tooltip()
})
</script>
</head>
<body>
<!-- Navigation -->
<nav class="navbar navbar-inverse navbar-fixed-top" role="navigation">
<div class="container topnavlinks">
<div class="navbar-header">
<button type="button" class="navbar-toggle" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
<span class="sr-only">Toggle navigation</span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<a class="fa fa-home fa-lg navbar-brand" href="../docs/home.html">&nbsp;<span class="projectTitle"> Apache Edgent Documentation</span></a>
</div>
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav navbar-right">
<!-- entries without drop-downs appear here -->
<!-- conditional logic to control which topnav appears for the audience defined in the configuration file.-->
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">GitHub Repos<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="https://github.com/apache/incubator-edgent" target="_blank">Source code</a></li>
<li><a href="https://github.com/apache/incubator-edgent-samples" target="_blank">Edgent Samples</a></li>
<li><a href="https://github.com/apache/incubator-edgent-website" target="_blank">Website/Documentation</a></li>
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Javadoc<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="..\javadoc\latest">latest</a></li>
<li><a href="..\javadoc\r1.2.0">1.2.0</a></li>
<li><a href="..\javadoc\r1.1.0">1.1.0</a></li>
<li><a href="..\javadoc\r1.0.0">1.0.0</a></li>
<li><a href="..\javadoc\r0.4.0">0.4.0</a></li>
</ul>
</li>
<!-- entries with drop-downs appear here -->
<!-- conditional logic to control which topnav appears for the audience defined in the configuration file.-->
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Edgent Resources<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="downloads">Download</a></li>
<li><a href="faq">FAQ</a></li>
<li class="dropdownActive"><a href="/">edgent.apache.org</a></li>
</ul>
</li>
<!-- special insertion -->
<!-- Send feedback function -->
<script>
function SendLinkByMail(href) {
var subject= "Apache Edgent Documentation feedback";
var body = "I have some feedback about the Applying different processing against a single stream page: ";
body += window.location.href;
body += "";
var uri = "mailto:?subject=";
uri += encodeURIComponent(subject);
uri += "&body=";
uri += encodeURIComponent(body);
window.location.href = uri;
}
</script>
<li><a href="mailto:dev@edgent.incubator.apache.org" target="_blank"><i class="fa fa-envelope-o"></i> Feedback</a></li>
<!--uncomment this block if you want simple search instead of algolia-->
<li>
<!--start search-->
<div id="search-demo-container">
<input type="text" id="search-input" placeholder="search...">
<ul id="results-container"></ul>
</div>
<script src="../js/jekyll-search.js" type="text/javascript"></script>
<script type="text/javascript">
SimpleJekyllSearch.init({
searchInput: document.getElementById('search-input'),
resultsContainer: document.getElementById('results-container'),
dataSource: '../search.json',
searchResultTemplate: '<li><a href="{url}" title="Applying different processing against a single stream">{title}</a></li>',
noResultsText: 'No results found.',
limit: 10,
fuzzy: true,
})
</script>
<!--end search-->
</li>
</div>
<!-- /.container -->
</nav>
<!-- Page Content -->
<div class="container">
<div class="col-lg-12">&nbsp;</div>
<!-- Content Row -->
<div class="row">
<!-- Sidebar Column -->
<div class="col-md-3">
<script>
$(document).ready(function() {
// Initialize navgoco with default options
$("#mysidebar").navgoco({
caretHtml: '',
accordion: true,
openClass: 'active', // open
save: true,
cookie: {
name: 'navgoco',
expires: false,
path: '/'
},
slide: {
duration: 400,
easing: 'swing'
}
});
$("#collapseAll").click(function(e) {
e.preventDefault();
$("#mysidebar").navgoco('toggle', false);
});
$("#expandAll").click(function(e) {
e.preventDefault();
$("#mysidebar").navgoco('toggle', true);
});
});
</script>
<ul id="mysidebar" class="nav">
<span class="siteTagline">Edgent</span>
<span class="versionTagline">Version 1.2.0-incubating</span>
<li><a href="#">Overview</a>
<ul>
<li><a href="../docs/edgent_index.html">Introduction</a></li>
<li><a href="../docs/home.html">Edgent Overview</a></li>
<li><a href="../docs/power-of-edgent.html">The Power of Edgent</a></li>
<li><a href="../docs/faq.html">FAQ</a></li>
</ul>
<li><a href="#">Get Started</a>
<ul>
<li><a href="../docs/downloads.html">Downloads</a></li>
<li><a href="../docs/edgent-getting-started.html">Getting Started Guide</a></li>
<li><a href="../docs/edgent-getting-started-samples.html">Quickstart with Edgent Samples</a></li>
<li><a href="../docs/application-development.html">Understanding App Development</a></li>
<li><a href="../docs/quickstart.html">Quickstart IBM Watson IoT Platform</a></li>
<li><a href="../docs/streaming-concepts.html">Streaming concepts</a></li>
<li><a href="../docs/common-edgent-operations.html">Common operations</a></li>
</ul>
<li><a href="#">Edgent Cookbook</a>
<ul>
<li><a href="../recipes/recipe_hello_edgent.html">Hello Edgent!</a></li>
<li><a href="../recipes/recipe_source_function.html">Writing a source function</a></li>
<li><a href="../recipes/recipe_value_out_of_range.html">Detecting a sensor value out of expected range</a></li>
<li class="active"><a href="../recipes/recipe_different_processing_against_stream.html">Applying different processing against a single stream</a></li>
<li><a href="../recipes/recipe_combining_streams_processing_results.html">Splitting a stream to apply different processing and combining the results into a single stream</a></li>
<li><a href="../recipes/recipe_external_filter_range.html">Using an external configuration file for filter ranges</a></li>
<li><a href="../recipes/recipe_adaptable_filter_range.html">Changing a filter's range</a></li>
<li><a href="../recipes/recipe_adaptable_polling_source.html">Changing a polled source stream's period</a></li>
<li><a href="../recipes/recipe_adaptable_deadtime_filter.html">Using an adaptable deadtime filter</a></li>
<li><a href="../recipes/recipe_dynamic_analytic_control.html">Dynamically enabling analytic flows</a></li>
<li><a href="../recipes/recipe_parallel_analytics.html">How can I run analytics on several tuples in parallel?</a></li>
<li><a href="../recipes/recipe_concurrent_analytics.html">How can I run several analytics on a tuple concurrently?</a></li>
<li><a href="../recipes/recipe_writing_a_connector.html">How do I write a connector?</a></li>
</ul>
<li><a href="#">Using the Console</a>
<ul>
<li><a href="../docs/console.html">Using the console</a></li>
</ul>
<li><a href="#">Get Involved</a>
<ul>
<li><a href="../docs/community.html">How to participate</a></li>
<li><a href="../docs/committers.html">Committers</a></li>
</ul>
<!-- if you aren't using the accordion, uncomment this block:
<p class="external">
<a href="#" id="collapseAll">Collapse All</a> | <a href="#" id="expandAll">Expand All</a>
</p>
-->
<br/>
</li>
</ul>
<div class="row">
<div class="col-md-12">
<!-- this handles the automatic toc. use ## for subheads to auto-generate the on-page minitoc. if you use html tags, you must supply an ID for the heading element in order for it to appear in the minitoc. -->
<script>
$( document ).ready(function() {
// Handler for .ready() called.
$('#toc').toc({ minimumHeaders: 0, listType: 'ul', showSpeed: 0, headers: 'h2,h3,h4' });
/* this offset helps account for the space taken up by the floating toolbar. */
$('#toc').on('click', 'a', function() {
var target = $(this.getAttribute('href'))
, scroll_target = target.offset().top
$(window).scrollTop(scroll_target - 10);
return false
})
});
</script>
<div id="toc"></div>
</div>
</div>
</div>
<!-- this highlights the active parent class in the navgoco sidebar. this is critical so that the parent expands when you're viewing a page. This must appear below the sidebar code above. Otherwise, if placed inside customscripts.js, the script runs before the sidebar code runs and the class never gets inserted.-->
<script>$("li.active").parents('li').toggleClass("active");</script>
<!-- Content Column -->
<div class="col-md-9">
<div class="post-header">
<h1 class="post-title-main">Applying different processing against a single stream</h1>
</div>
<div class="post-content">
<!-- this handles the automatic toc. use ## for subheads to auto-generate the on-page minitoc. if you use html tags, you must supply an ID for the heading element in order for it to appear in the minitoc. -->
<script>
$( document ).ready(function() {
// Handler for .ready() called.
$('#toc').toc({ minimumHeaders: 0, listType: 'ul', showSpeed: 0, headers: 'h2,h3,h4' });
/* this offset helps account for the space taken up by the floating toolbar. */
$('#toc').on('click', 'a', function() {
var target = $(this.getAttribute('href'))
, scroll_target = target.offset().top
$(window).scrollTop(scroll_target - 10);
return false
})
});
</script>
<div id="toc"></div>
<a target="_blank" href="https://github.com/apache/incubator-edgent-website/blob/master/site/recipes/recipe_different_processing_against_stream.md" class="btn btn-default githubEditButton" role="button"><i class="fa fa-github fa-lg"></i> Edit me</a>
<p>In the previous <a href="recipe_value_out_of_range">recipe</a>, we learned how to filter a stream to obtain the interesting sensor readings and ignore the mundane data. Typically, a user scenario is more involved, where data is processed using different stream operations. Consider the following scenario, for example.</p>
<p>Suppose a package delivery company would like to monitor the gas mileage of their delivery trucks using embedded sensors. They would like to apply different analytics to the sensor data that can be used to make more informed business decisions. For instance, if a truck is reporting consistently poor gas mileage readings, the company might want to consider replacing that truck to save on gas costs. Perhaps the company also wants to convert the sensor readings to JSON format in order to easily display the data on a web page. It may also be interested in determining the expected gallons of gas used based on the current gas mileage.</p>
<p>In this instance, we can take the stream of gas mileage sensor readings and apply multiple types of processing against it so that we end up with streams that serve different purposes.</p>
<h2 id="setting-up-the-application">Setting up the application</h2>
<p>We assume that the environment has been set up following the steps outlined in the <a href="../docs/edgent-getting-started">Getting started guide</a>. Let&#39;s begin by creating a <code>DirectProvider</code> and <code>Topology</code>. We choose a <code>DevelopmentProvider</code> so that we can view the topology graph using the console URL (refer to the <a href="../docs/console">Application console</a> page for a more detailed explanation of this provider). The gas mileage bounds, initial gas mileage value, and the number of miles in a typical delivery route have also been defined.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kn">import</span> <span class="nn">java.text.DecimalFormat</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">com.google.gson.JsonObject</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.analytics.sensors.Ranges</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.console.server.HttpServer</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.providers.development.DevelopmentProvider</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.providers.direct.DirectProvider</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.samples.utils.sensor.SimpleSimulatedSensor</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.topology.TStream</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.topology.Topology</span><span class="o">;</span>
<span class="kd">public</span> <span class="kd">class</span> <span class="nc">ApplyDifferentProcessingAgainstStream</span> <span class="o">{</span>
<span class="cm">/**
* Gas mileage (in miles per gallon, or mpg) value bounds
*/</span>
<span class="kd">static</span> <span class="kt">double</span> <span class="n">MPG_LOW</span> <span class="o">=</span> <span class="mf">7.0</span><span class="o">;</span>
<span class="kd">static</span> <span class="kt">double</span> <span class="n">MPG_HIGH</span> <span class="o">=</span> <span class="mf">14.0</span><span class="o">;</span>
<span class="cm">/**
* Initial gas mileage sensor value
*/</span>
<span class="kd">static</span> <span class="kt">double</span> <span class="n">INITIAL_MPG</span> <span class="o">=</span> <span class="mf">10.5</span><span class="o">;</span>
<span class="cm">/**
* Hypothetical value for the number of miles in a typical delivery route
*/</span>
<span class="kd">static</span> <span class="kt">double</span> <span class="n">ROUTE_MILES</span> <span class="o">=</span> <span class="mi">80</span><span class="o">;</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">DirectProvider</span> <span class="n">dp</span> <span class="o">=</span> <span class="k">new</span> <span class="n">DevelopmentProvider</span><span class="o">();</span>
<span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="n">dp</span><span class="o">.</span><span class="na">getServices</span><span class="o">().</span><span class="na">getService</span><span class="o">(</span><span class="n">HttpServer</span><span class="o">.</span><span class="na">class</span><span class="o">).</span><span class="na">getConsoleUrl</span><span class="o">());</span>
<span class="n">Topology</span> <span class="n">top</span> <span class="o">=</span> <span class="n">dp</span><span class="o">.</span><span class="na">newTopology</span><span class="o">(</span><span class="s">"GasMileageSensor"</span><span class="o">);</span>
<span class="c1">// The rest of the code pieces belong here</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div>
<h2 id="generating-gas-mileage-sensor-readings">Generating gas mileage sensor readings</h2>
<p>The next step is to simulate a stream of gas mileage readings using <a href="https://github.com/apache/incubator-edgent/blob/master/samples/utils/src/main/java/org/apache/edgent/samples/utils/sensor/SimpleSimulatedSensor.java"><code>SimpleSimulatedSensor</code></a>. We set the initial gas mileage and delta factor in the first two arguments. The last argument ensures that the sensor reading falls in an acceptable range (between 7.0 mpg and 14.0 mpg). In our <code>main()</code>, we use the <code>poll()</code> method to generate a flow of tuples (readings), where each tuple arrives every second.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Generate a stream of gas mileage sensor readings</span>
<span class="n">SimpleSimulatedSensor</span> <span class="n">mpgSensor</span> <span class="o">=</span> <span class="k">new</span> <span class="n">SimpleSimulatedSensor</span><span class="o">(</span><span class="n">INITIAL_MPG</span><span class="o">,</span>
<span class="mf">0.4</span><span class="o">,</span> <span class="n">Ranges</span><span class="o">.</span><span class="na">closed</span><span class="o">(</span><span class="n">MPG_LOW</span><span class="o">,</span> <span class="n">MPG_HIGH</span><span class="o">));</span>
<span class="n">TStream</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">mpgReadings</span> <span class="o">=</span> <span class="n">top</span><span class="o">.</span><span class="na">poll</span><span class="o">(</span><span class="n">mpgSensor</span><span class="o">,</span> <span class="mi">1</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">);</span>
</code></pre></div>
<h2 id="applying-different-processing-to-the-stream">Applying different processing to the stream</h2>
<p>The company can now perform analytics on the <code>mpgReadings</code> stream and feed it to different functions.</p>
<p>First, we can filter out gas mileage values that are considered poor and tag the resulting stream for easier viewing in the console.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Filter out the poor gas mileage readings</span>
<span class="n">TStream</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">poorMpg</span> <span class="o">=</span> <span class="n">mpgReadings</span>
<span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">mpg</span> <span class="o">-&gt;</span> <span class="n">mpg</span> <span class="o">&lt;=</span> <span class="mf">9.0</span><span class="o">).</span><span class="na">tag</span><span class="o">(</span><span class="s">"filtered"</span><span class="o">);</span>
</code></pre></div>
<p>If the company also wants the readings to be in JSON, we can easily create a new stream and convert from type <code>Double</code> to <code>JsonObject</code>.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Map Double to JsonObject</span>
<span class="n">TStream</span><span class="o">&lt;</span><span class="n">JsonObject</span><span class="o">&gt;</span> <span class="n">json</span> <span class="o">=</span> <span class="n">mpgReadings</span>
<span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">mpg</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="n">JsonObject</span> <span class="n">jObj</span> <span class="o">=</span> <span class="k">new</span> <span class="n">JsonObject</span><span class="o">();</span>
<span class="n">jObj</span><span class="o">.</span><span class="na">addProperty</span><span class="o">(</span><span class="s">"gasMileage"</span><span class="o">,</span> <span class="n">mpg</span><span class="o">);</span>
<span class="k">return</span> <span class="n">jObj</span><span class="o">;</span>
<span class="o">}).</span><span class="na">tag</span><span class="o">(</span><span class="s">"mapped"</span><span class="o">);</span>
</code></pre></div>
<p>In addition, we can calculate the estimated gallons of gas used based on the current gas mileage using <code>modify</code>.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Modify gas mileage stream to obtain a stream containing the estimated gallons of gas used</span>
<span class="n">DecimalFormat</span> <span class="n">df</span> <span class="o">=</span> <span class="k">new</span> <span class="n">DecimalFormat</span><span class="o">(</span><span class="s">"#.#"</span><span class="o">);</span>
<span class="n">TStream</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">gallonsUsed</span> <span class="o">=</span> <span class="n">mpgReadings</span>
<span class="o">.</span><span class="na">modify</span><span class="o">(</span><span class="n">mpg</span> <span class="o">-&gt;</span> <span class="n">Double</span><span class="o">.</span><span class="na">valueOf</span><span class="o">(</span><span class="n">df</span><span class="o">.</span><span class="na">format</span><span class="o">(</span><span class="n">ROUTE_MILES</span> <span class="o">/</span> <span class="n">mpg</span><span class="o">))).</span><span class="na">tag</span><span class="o">(</span><span class="s">"modified"</span><span class="o">);</span>
</code></pre></div>
<p>The three examples demonstrated here are a small subset of the many other possibilities of stream processing.</p>
<p>With each of these resulting streams, the company can perform further analytics, but at this point, we terminate the streams by printing out the tuples on each stream.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Terminate the streams</span>
<span class="n">poorMpg</span><span class="o">.</span><span class="na">sink</span><span class="o">(</span><span class="n">mpg</span> <span class="o">-&gt;</span> <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Poor gas mileage! "</span> <span class="o">+</span> <span class="n">mpg</span> <span class="o">+</span> <span class="s">" mpg"</span><span class="o">));</span>
<span class="n">json</span><span class="o">.</span><span class="na">sink</span><span class="o">(</span><span class="n">mpg</span> <span class="o">-&gt;</span> <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"JSON: "</span> <span class="o">+</span> <span class="n">mpg</span><span class="o">));</span>
<span class="n">gallonsUsed</span><span class="o">.</span><span class="na">sink</span><span class="o">(</span><span class="n">gas</span> <span class="o">-&gt;</span> <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Gallons of gas: "</span> <span class="o">+</span> <span class="n">gas</span> <span class="o">+</span> <span class="s">"\n"</span><span class="o">));</span>
</code></pre></div>
<p>We end our application by submitting the <code>Topology</code>.</p>
<h2 id="observing-the-output">Observing the output</h2>
<p>When the final application is run, the output looks something like the following:</p>
<div class="highlight"><pre><code class="language-" data-lang="">JSON: {"gasMileage":9.5}
Gallons of gas: 8.4
JSON: {"gasMileage":9.2}
Gallons of gas: 8.7
Poor gas mileage! 9.0 mpg
JSON: {"gasMileage":9.0}
Gallons of gas: 8.9
Poor gas mileage! 8.8 mpg
JSON: {"gasMileage":8.8}
Gallons of gas: 9.1
</code></pre></div>
<h2 id="a-look-at-the-topology-graph">A look at the topology graph</h2>
<p>Let&#39;s see what the topology graph looks like. We can view it using the console URL that was printed to standard output at the start of the application. We see that original stream is fanned out to three separate streams, and the <code>filter</code>, <code>map</code>, and <code>modify</code> operations are applied.</p>
<p><img src="images/different_processing_against_stream_topology_graph.jpg" alt="Image of the topology graph"></p>
<h2 id="the-final-application">The final application</h2>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kn">import</span> <span class="nn">java.text.DecimalFormat</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">com.google.gson.JsonObject</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.analytics.sensors.Ranges</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.console.server.HttpServer</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.providers.development.DevelopmentProvider</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.providers.direct.DirectProvider</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.samples.utils.sensor.SimpleSimulatedSensor</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.topology.TStream</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.edgent.topology.Topology</span><span class="o">;</span>
<span class="cm">/**
* Fan out stream and perform different analytics on the resulting streams.
*/</span>
<span class="kd">public</span> <span class="kd">class</span> <span class="nc">ApplyDifferentProcessingAgainstStream</span> <span class="o">{</span>
<span class="cm">/**
* Gas mileage (in miles per gallon, or mpg) value bounds
*/</span>
<span class="kd">static</span> <span class="kt">double</span> <span class="n">MPG_LOW</span> <span class="o">=</span> <span class="mf">7.0</span><span class="o">;</span>
<span class="kd">static</span> <span class="kt">double</span> <span class="n">MPG_HIGH</span> <span class="o">=</span> <span class="mf">14.0</span><span class="o">;</span>
<span class="cm">/**
* Initial gas mileage sensor value
*/</span>
<span class="kd">static</span> <span class="kt">double</span> <span class="n">INITIAL_MPG</span> <span class="o">=</span> <span class="mf">10.5</span><span class="o">;</span>
<span class="cm">/**
* Hypothetical value for the number of miles in a typical delivery route
*/</span>
<span class="kd">static</span> <span class="kt">double</span> <span class="n">ROUTE_MILES</span> <span class="o">=</span> <span class="mi">80</span><span class="o">;</span>
<span class="cm">/**
* Polls a simulated delivery truck sensor to periodically obtain
* gas mileage readings (in miles/gallon). Feed the stream of sensor
* readings to different functions (filter, map, and modify).
*/</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">DirectProvider</span> <span class="n">dp</span> <span class="o">=</span> <span class="k">new</span> <span class="n">DevelopmentProvider</span><span class="o">();</span>
<span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="n">dp</span><span class="o">.</span><span class="na">getServices</span><span class="o">().</span><span class="na">getService</span><span class="o">(</span><span class="n">HttpServer</span><span class="o">.</span><span class="na">class</span><span class="o">).</span><span class="na">getConsoleUrl</span><span class="o">());</span>
<span class="n">Topology</span> <span class="n">top</span> <span class="o">=</span> <span class="n">dp</span><span class="o">.</span><span class="na">newTopology</span><span class="o">(</span><span class="s">"GasMileageSensor"</span><span class="o">);</span>
<span class="c1">// Generate a stream of gas mileage sensor readings</span>
<span class="n">SimpleSimulatedSensor</span> <span class="n">mpgSensor</span> <span class="o">=</span> <span class="k">new</span> <span class="n">SimpleSimulatedSensor</span><span class="o">(</span><span class="n">INITIAL_MPG</span><span class="o">,</span>
<span class="mf">0.4</span><span class="o">,</span> <span class="n">Ranges</span><span class="o">.</span><span class="na">closed</span><span class="o">(</span><span class="n">MPG_LOW</span><span class="o">,</span> <span class="n">MPG_HIGH</span><span class="o">));</span>
<span class="n">TStream</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">mpgReadings</span> <span class="o">=</span> <span class="n">top</span><span class="o">.</span><span class="na">poll</span><span class="o">(</span><span class="n">mpgSensor</span><span class="o">,</span> <span class="mi">1</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">);</span>
<span class="c1">// Filter out the poor gas mileage readings</span>
<span class="n">TStream</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">poorMpg</span> <span class="o">=</span> <span class="n">mpgReadings</span>
<span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">mpg</span> <span class="o">-&gt;</span> <span class="n">mpg</span> <span class="o">&lt;=</span> <span class="mf">9.0</span><span class="o">).</span><span class="na">tag</span><span class="o">(</span><span class="s">"filtered"</span><span class="o">);</span>
<span class="c1">// Map Double to JsonObject</span>
<span class="n">TStream</span><span class="o">&lt;</span><span class="n">JsonObject</span><span class="o">&gt;</span> <span class="n">json</span> <span class="o">=</span> <span class="n">mpgReadings</span>
<span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">mpg</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="n">JsonObject</span> <span class="n">jObj</span> <span class="o">=</span> <span class="k">new</span> <span class="n">JsonObject</span><span class="o">();</span>
<span class="n">jObj</span><span class="o">.</span><span class="na">addProperty</span><span class="o">(</span><span class="s">"gasMileage"</span><span class="o">,</span> <span class="n">mpg</span><span class="o">);</span>
<span class="k">return</span> <span class="n">jObj</span><span class="o">;</span>
<span class="o">}).</span><span class="na">tag</span><span class="o">(</span><span class="s">"mapped"</span><span class="o">);</span>
<span class="c1">// Modify gas mileage stream to obtain a stream containing the estimated gallons of gas used</span>
<span class="n">DecimalFormat</span> <span class="n">df</span> <span class="o">=</span> <span class="k">new</span> <span class="n">DecimalFormat</span><span class="o">(</span><span class="s">"#.#"</span><span class="o">);</span>
<span class="n">TStream</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">gallonsUsed</span> <span class="o">=</span> <span class="n">mpgReadings</span>
<span class="o">.</span><span class="na">modify</span><span class="o">(</span><span class="n">mpg</span> <span class="o">-&gt;</span> <span class="n">Double</span><span class="o">.</span><span class="na">valueOf</span><span class="o">(</span><span class="n">df</span><span class="o">.</span><span class="na">format</span><span class="o">(</span><span class="n">ROUTE_MILES</span> <span class="o">/</span> <span class="n">mpg</span><span class="o">))).</span><span class="na">tag</span><span class="o">(</span><span class="s">"modified"</span><span class="o">);</span>
<span class="c1">// Terminate the streams</span>
<span class="n">poorMpg</span><span class="o">.</span><span class="na">sink</span><span class="o">(</span><span class="n">mpg</span> <span class="o">-&gt;</span> <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Poor gas mileage! "</span> <span class="o">+</span> <span class="n">mpg</span> <span class="o">+</span> <span class="s">" mpg"</span><span class="o">));</span>
<span class="n">json</span><span class="o">.</span><span class="na">sink</span><span class="o">(</span><span class="n">mpg</span> <span class="o">-&gt;</span> <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"JSON: "</span> <span class="o">+</span> <span class="n">mpg</span><span class="o">));</span>
<span class="n">gallonsUsed</span><span class="o">.</span><span class="na">sink</span><span class="o">(</span><span class="n">gas</span> <span class="o">-&gt;</span> <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Gallons of gas: "</span> <span class="o">+</span> <span class="n">gas</span> <span class="o">+</span> <span class="s">"\n"</span><span class="o">));</span>
<span class="n">dp</span><span class="o">.</span><span class="na">submit</span><span class="o">(</span><span class="n">top</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div>
<div class="tags">
</div>
<!--
<div id="disqus_thread"></div>
<script type="text/javascript">
/* * * CONFIGURATION VARIABLES: EDIT BEFORE PASTING INTO YOUR WEBPAGE * * */
var disqus_shortname = 'idrbwjekyll'; // required: replace example with your forum shortname
/* * * DON'T EDIT BELOW THIS LINE * * */
(function() {
var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
(document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
})();
</script>
<noscript>Please enable JavaScript to view the <a href="https://disqus.com/?ref_noscript">comments powered by Disqus.</a></noscript>
-->
</div>
<footer>
<div class="row">
<div class="col-lg-12 footer">
Site last
generated: Apr 3, 2019 <br/>
</div>
</div>
<br/>
<div class="row">
<div class="col-md-12">
<p class="small">Apache Edgent is an effort undergoing Incubation at The Apache Software
Foundation (ASF), sponsored by the Incubator. Incubation is required of all newly accepted projects
until a further review indicates that the infrastructure, communications, and decision making process
have stabilized in a manner consistent with other successful ASF projects. While incubation status is
not necessarily a reflection of the completeness or stability of the code, it does indicate that the
project has yet to be fully endorsed by the ASF.</p>
</div>
</div>
<div class="row">
<div class="col-md-12">
<p class="small">Copyright © 2016 The Apache Software Foundation. Licensed under the Apache
License, Version 2.0.
Apache, the Apache Feather logo, and the Apache Incubator project logo are trademarks of The Apache
Software Foundation. All other marks mentioned may be trademarks or registered trademarks of their
respective owners.</p>
</div>
</div>
<div class="container">
<div class="row">
<div>
<img class="img-responsive center-block" src="../img/edgent_incubation.png" style="display: block; margin: auto;"alt="">
</div>
</div>
</footer>
</div><!-- /.row -->
</div> <!-- /.container -->
</body>
</html>