blob: b95843e583a43e4ac5a04826c2f51d485fba6ad9 [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
<link rel="icon" href="/favicon.ico" type="image/x-icon">
<title>Joining Streams in Storm Core</title>
<!-- Bootstrap core CSS -->
<link href="/assets/css/bootstrap.min.css" rel="stylesheet">
<!-- Bootstrap theme -->
<link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet">
<!-- Custom styles for this template -->
<link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css">
<link href="/css/style.css" rel="stylesheet">
<link href="/assets/css/owl.theme.css" rel="stylesheet">
<link href="/assets/css/owl.carousel.css" rel="stylesheet">
<script type="text/javascript" src="/assets/js/jquery.min.js"></script>
<script type="text/javascript" src="/assets/js/bootstrap.min.js"></script>
<script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script>
<script type="text/javascript" src="/assets/js/storm.js"></script>
<!-- Just for debugging purposes. Don't actually copy these 2 lines! -->
<!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]-->
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<header>
<div class="container-fluid">
<div class="row">
<div class="col-md-5">
<a href="/index.html"><img src="/images/logo.png" class="logo" /></a>
</div>
<div class="col-md-5">
<h1>Version: 1.2.3</h1>
</div>
<div class="col-md-2">
<a href="/downloads.html" class="btn-std btn-block btn-download">Download</a>
</div>
</div>
</div>
</header>
<!--Header End-->
<!--Navigation Begin-->
<div class="navbar" role="banner">
<div class="container-fluid">
<div class="navbar-header">
<button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
</div>
<nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation">
<ul class="nav navbar-nav">
<li><a href="/index.html" id="home">Home</a></li>
<li><a href="/getting-help.html" id="getting-help">Getting Help</a></li>
<li><a href="/about/integrates.html" id="project-info">Project Information</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/releases/2.3.0/index.html">2.3.0</a></li>
<li><a href="/releases/2.2.0/index.html">2.2.0</a></li>
<li><a href="/releases/2.1.0/index.html">2.1.0</a></li>
<li><a href="/releases/2.0.0/index.html">2.0.0</a></li>
<li><a href="/releases/1.2.3/index.html">1.2.3</a></li>
</ul>
</li>
<li><a href="/talksAndVideos.html">Talks and Slideshows</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li>
<li><a href="/contribute/People.html">People</a></li>
<li><a href="/contribute/BYLAWS.html">ByLaws</a></li>
</ul>
</li>
<li><a href="/2021/09/27/storm230-released.html" id="news">News</a></li>
</ul>
</nav>
</div>
</div>
<div class="container-fluid">
<h1 class="page-title">Joining Streams in Storm Core</h1>
<div class="row">
<div class="col-md-12">
<!-- Documentation -->
<p class="post-meta"></p>
<div class="documentation-content"><p>Storm core supports joining multiple data streams into one with the help of <code>JoinBolt</code>.
<code>JoinBolt</code> is a Windowed bolt, i.e. it waits for the configured window duration to match up the
tuples among the streams being joined. This helps align the streams within a Window boundary.</p>
<p>Each of <code>JoinBolt</code>&#39;s incoming data streams must be Fields Grouped on a single field. A stream
should only be joined with the other streams using the field on which it has been FieldsGrouped.<br>
Knowing this will help understand the join syntax described below. </p>
<h2 id="performing-joins">Performing Joins</h2>
<p>Consider the following SQL join involving 4 tables:</p>
<div class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">select</span> <span class="n">userId</span><span class="p">,</span> <span class="n">key4</span><span class="p">,</span> <span class="n">key2</span><span class="p">,</span> <span class="n">key3</span>
<span class="k">from</span> <span class="n">table1</span>
<span class="k">inner</span> <span class="k">join</span> <span class="n">table2</span> <span class="k">on</span> <span class="n">table2</span><span class="p">.</span><span class="n">userId</span> <span class="o">=</span> <span class="n">table1</span><span class="p">.</span><span class="n">key1</span>
<span class="k">inner</span> <span class="k">join</span> <span class="n">table3</span> <span class="k">on</span> <span class="n">table3</span><span class="p">.</span><span class="n">key3</span> <span class="o">=</span> <span class="n">table2</span><span class="p">.</span><span class="n">userId</span>
<span class="k">left</span> <span class="k">join</span> <span class="n">table4</span> <span class="k">on</span> <span class="n">table4</span><span class="p">.</span><span class="n">key4</span> <span class="o">=</span> <span class="n">table3</span><span class="p">.</span><span class="n">key3</span>
</code></pre></div>
<p>Similar joins could be expressed on tuples generated by 4 spouts using <code>JoinBolt</code>:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">JoinBolt</span> <span class="n">jbolt</span> <span class="o">=</span> <span class="k">new</span> <span class="n">JoinBolt</span><span class="o">(</span><span class="s">"spout1"</span><span class="o">,</span> <span class="s">"key1"</span><span class="o">)</span> <span class="c1">// from spout1 </span>
<span class="o">.</span><span class="na">join</span> <span class="o">(</span><span class="s">"spout2"</span><span class="o">,</span> <span class="s">"userId"</span><span class="o">,</span> <span class="s">"spout1"</span><span class="o">)</span> <span class="c1">// inner join spout2 on spout2.userId = spout1.key1</span>
<span class="o">.</span><span class="na">join</span> <span class="o">(</span><span class="s">"spout3"</span><span class="o">,</span> <span class="s">"key3"</span><span class="o">,</span> <span class="s">"spout2"</span><span class="o">)</span> <span class="c1">// inner join spout3 on spout3.key3 = spout2.userId </span>
<span class="o">.</span><span class="na">leftJoin</span> <span class="o">(</span><span class="s">"spout4"</span><span class="o">,</span> <span class="s">"key4"</span><span class="o">,</span> <span class="s">"spout3"</span><span class="o">)</span> <span class="c1">// left join spout4 on spout4.key4 = spout3.key3</span>
<span class="o">.</span><span class="na">select</span> <span class="o">(</span><span class="s">"userId, key4, key2, spout3:key3"</span><span class="o">)</span> <span class="c1">// chose output fields</span>
<span class="o">.</span><span class="na">withTumblingWindow</span><span class="o">(</span> <span class="k">new</span> <span class="n">Duration</span><span class="o">(</span><span class="mi">10</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">)</span> <span class="o">)</span> <span class="o">;</span>
<span class="n">topoBuilder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"joiner"</span><span class="o">,</span> <span class="n">jbolt</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span>
<span class="o">.</span><span class="na">fieldsGrouping</span><span class="o">(</span><span class="s">"spout1"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"key1"</span><span class="o">)</span> <span class="o">)</span>
<span class="o">.</span><span class="na">fieldsGrouping</span><span class="o">(</span><span class="s">"spout2"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"userId"</span><span class="o">)</span> <span class="o">)</span>
<span class="o">.</span><span class="na">fieldsGrouping</span><span class="o">(</span><span class="s">"spout3"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"key3"</span><span class="o">)</span> <span class="o">)</span>
<span class="o">.</span><span class="na">fieldsGrouping</span><span class="o">(</span><span class="s">"spout4"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"key4"</span><span class="o">)</span> <span class="o">);</span>
</code></pre></div>
<p>The bolt constructor takes two arguments. The 1st argument introduces the data from <code>spout1</code>
to be the first stream and specifies that it will always use field <code>key1</code> when joining this with the others streams.
The name of the component specified must refer to the spout or bolt that is directly connected to the Join bolt.
Here data received from <code>spout1</code> must be fields grouped on <code>key1</code>. Similarly, each of the <code>leftJoin()</code> and <code>join()</code> method
calls introduce a new stream along with the field to use for the join. As seen in above example, the same FieldsGrouping
requirement applies to these streams as well. The 3rd argument to the join methods refers to another stream with which
to join.</p>
<p>The <code>select()</code> method is used to specify the output fields. The argument to <code>select</code> is a comma separated list of fields.
Individual field names can be prefixed with a stream name to disambiguate between the same field name occurring in
multiple streams as follows: <code>.select(&quot;spout3:key3, spout4:key3&quot;)</code>. Nested tuple types are supported if the
nesting has been done using <code>Map</code>s. For example <code>outer.inner.innermost</code> refers to a field that is nested three levels
deep where <code>outer</code> and <code>inner</code> are of type <code>Map</code>. </p>
<p>Stream name prefix is not allowed for the fields in any of the join() arguments, but nested fields are supported. </p>
<p>The call to <code>withTumblingWindow()</code> above, configures the join window to be a 10 minute tumbling window. Since <code>JoinBolt</code>
is a Windowed Bolt, we can also use the <code>withWindow</code> method to configure it as a sliding window (see tips section below). </p>
<h2 id="stream-names-and-join-order">Stream names and Join order</h2>
<ul>
<li>Stream names must be introduced (in constructor or as 1st arg to various join methods) before being referred
to (in the 3rd argument of the join methods). Forward referencing of stream names, as shown below, is not allowed:</li>
</ul>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="k">new</span> <span class="n">JoinBolt</span><span class="o">(</span> <span class="s">"spout1"</span><span class="o">,</span> <span class="s">"key1"</span><span class="o">)</span>
<span class="o">.</span><span class="na">join</span> <span class="o">(</span> <span class="s">"spout2"</span><span class="o">,</span> <span class="s">"userId"</span><span class="o">,</span> <span class="s">"spout3"</span><span class="o">)</span> <span class="c1">//not allowed. spout3 not yet introduced</span>
<span class="o">.</span><span class="na">join</span> <span class="o">(</span> <span class="s">"spout3"</span><span class="o">,</span> <span class="s">"key3"</span><span class="o">,</span> <span class="s">"spout1"</span><span class="o">)</span>
</code></pre></div>
<ul>
<li>Internally, the joins will be performed in the order expressed by the user.</li>
</ul>
<h2 id="joining-based-on-stream-names">Joining based on Stream names</h2>
<p>For simplicity, Storm topologies often use the <code>default</code> stream. Topologies can also use named streams
instead of <code>default</code> streams. To support such topologies, <code>JoinBolt</code> can be configured to use stream
names, instead of source component (spout/bolt) names, via the constructor&#39;s first argument:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="k">new</span> <span class="n">JoinBolt</span><span class="o">(</span><span class="n">JoinBolt</span><span class="o">.</span><span class="na">Selector</span><span class="o">.</span><span class="na">STREAM</span><span class="o">,</span> <span class="s">"stream1"</span><span class="o">,</span> <span class="s">"key1"</span><span class="o">)</span>
<span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="s">"stream2"</span><span class="o">,</span> <span class="s">"key2"</span><span class="o">)</span>
<span class="o">...</span>
</code></pre></div>
<p>The first argument <code>JoinBolt.Selector.STREAM</code> informs the bolt that <code>stream1/2/3/4</code> refer to named streams
(as opposed to names of upstream spouts/bolts).</p>
<p>The below example joins two named streams from four spouts:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="k">new</span> <span class="n">JoinBolt</span><span class="o">(</span><span class="n">JoinBolt</span><span class="o">.</span><span class="na">Selector</span><span class="o">.</span><span class="na">STREAM</span><span class="o">,</span> <span class="s">"stream1"</span><span class="o">,</span> <span class="s">"key1"</span><span class="o">)</span>
<span class="o">.</span><span class="na">join</span> <span class="o">(</span><span class="s">"stream2"</span><span class="o">,</span> <span class="s">"userId"</span><span class="o">,</span> <span class="s">"stream1"</span> <span class="o">)</span>
<span class="o">.</span><span class="na">select</span> <span class="o">(</span><span class="s">"userId, key1, key2"</span><span class="o">)</span>
<span class="o">.</span><span class="na">withTumblingWindow</span><span class="o">(</span> <span class="k">new</span> <span class="n">Duration</span><span class="o">(</span><span class="mi">10</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">)</span> <span class="o">)</span> <span class="o">;</span>
<span class="n">topoBuilder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"joiner"</span><span class="o">,</span> <span class="n">jbolt</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span>
<span class="o">.</span><span class="na">fieldsGrouping</span><span class="o">(</span><span class="s">"bolt1"</span><span class="o">,</span> <span class="s">"stream1"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"key1"</span><span class="o">)</span> <span class="o">)</span>
<span class="o">.</span><span class="na">fieldsGrouping</span><span class="o">(</span><span class="s">"bolt2"</span><span class="o">,</span> <span class="s">"stream1"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"key1"</span><span class="o">)</span> <span class="o">)</span>
<span class="o">.</span><span class="na">fieldsGrouping</span><span class="o">(</span><span class="s">"bolt3"</span><span class="o">,</span> <span class="s">"stream2"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"userId"</span><span class="o">)</span> <span class="o">)</span>
<span class="o">.</span><span class="na">fieldsGrouping</span><span class="o">(</span><span class="s">"bolt4"</span><span class="o">,</span> <span class="s">"stream1"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"key1"</span><span class="o">)</span> <span class="o">);</span>
</code></pre></div>
<p>In the above example, it is possible that <code>bolt1</code>, for example, is emitting other streams also. But the join bolt
is only subscribing to <code>stream1</code> &amp; <code>stream2</code> from the different bolts. <code>stream1</code> from <code>bolt1</code>, <code>bolt2</code> and <code>bolt4</code>
is treated as a single stream and joined against <code>stream2</code> from <code>bolt3</code>.</p>
<h2 id="limitations">Limitations:</h2>
<ol>
<li><p>Currently only INNER and LEFT joins are supported. </p></li>
<li><p>Unlike SQL, which allows joining the same table on different keys to different tables, here the same one field must be used
on a stream. Fields Grouping ensures the right tuples are routed to the right instances of a Join Bolt. Consequently the
FieldsGrouping field must be same as the join field, for correct results. To perform joins on multiple fields, the fields
can be combined into one field and then sent to the Join bolt. </p></li>
</ol>
<h2 id="tips">Tips:</h2>
<ol>
<li><p>Joins can be CPU and memory intensive. The larger the data accumulated in the current window (proportional to window
length), the longer it takes to do the join. Having a short sliding interval (few seconds for example) triggers frequent
joins. Consequently performance can suffer if using large window lengths or small sliding intervals or both.</p></li>
<li><p>Duplication of joined records across windows is possible when using Sliding Windows. This is because the tuples continue to exist
across multiple windows when using Sliding Windows.</p></li>
<li><p>If message timeouts are enabled, ensure the timeout setting (topology.message.timeout.secs) is large enough to comfortably
accommodate the window size, plus the additional processing by other spouts and bolts.</p></li>
<li><p>Joining a window of two streams with M and N elements each, <em>in the worst case</em>, can produce MxN elements with every output tuple
anchored to one tuple from each input stream. This can mean a lot of output tuples from JoinBolt and even more ACKs for downstream bolts
to emit. This can place a substantial pressure on the messaging system and dramatically slowdown the topology if not careful.
To manage the load on the messaging subsystem, it is advisable to:</p>
<ul>
<li>Increase the worker&#39;s heap (topology.worker.max.heap.size.mb).</li>
<li><strong>If</strong> ACKing is not necessary for your topology, disable ACKers (topology.acker.executors=0).</li>
<li>Disable event logger (topology.eventlogger.executors=0).</li>
<li>Turn of topology debugging (topology.debug=false).</li>
<li>Set topology.max.spout.pending to a value large enough to accommodate an estimated full window worth of tuples plus some more for headroom.
This helps mitigate the possibility of spouts emitting excessive tuples when messaging subsystem is experiencing excessive load. This situation
can occur when its value is set to null.</li>
<li>Lastly, keep the window size to the minimum value necessary for solving the problem at hand.</li>
</ul></li>
</ol>
</div>
</div>
</div>
</div>
<footer>
<div class="container-fluid">
<div class="row">
<div class="col-md-3">
<div class="footer-widget">
<h5>Meetups</h5>
<ul class="latest-news">
<li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li>
<li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li>
<li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li>
<li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li>
<li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li>
<li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li>
<!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> -->
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>About Apache Storm</h5>
<p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>First Look</h5>
<ul class="footer-list">
<li><a href="/releases/current/Rationale.html">Rationale</a></li>
<li><a href="/releases/current/Tutorial.html">Tutorial</a></li>
<li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li>
<li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li>
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>Documentation</h5>
<ul class="footer-list">
<li><a href="/releases/current/index.html">Index</a></li>
<li><a href="/releases/current/javadocs/index.html">Javadoc</a></li>
<li><a href="/releases/current/FAQ.html">FAQ</a></li>
</ul>
</div>
</div>
</div>
<hr/>
<div class="row">
<div class="col-md-12">
<p align="center">Copyright © 2019 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved.
<br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation.
<br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p>
</div>
</div>
</div>
</footer>
<!--Footer End-->
<!-- Scroll to top -->
<span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span>
</body>
</html>