| <!DOCTYPE html> |
| <html> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| |
| <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon"> |
| <link rel="icon" href="/favicon.ico" type="image/x-icon"> |
| |
| <title>Joining Streams in Storm Core</title> |
| |
| <!-- Bootstrap core CSS --> |
| <link href="/assets/css/bootstrap.min.css" rel="stylesheet"> |
| <!-- Bootstrap theme --> |
| <link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet"> |
| |
| <!-- Custom styles for this template --> |
| <link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css"> |
| <link href="/css/style.css" rel="stylesheet"> |
| <link href="/assets/css/owl.theme.css" rel="stylesheet"> |
| <link href="/assets/css/owl.carousel.css" rel="stylesheet"> |
| <script type="text/javascript" src="/assets/js/jquery.min.js"></script> |
| <script type="text/javascript" src="/assets/js/bootstrap.min.js"></script> |
| <script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script> |
| <script type="text/javascript" src="/assets/js/storm.js"></script> |
| <!-- Just for debugging purposes. Don't actually copy these 2 lines! --> |
| <!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]--> |
| |
| <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> |
| <!--[if lt IE 9]> |
| <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script> |
| <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script> |
| <![endif]--> |
| </head> |
| |
| |
| <body> |
| <header> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-5"> |
| <a href="/index.html"><img src="/images/logo.png" class="logo" /></a> |
| </div> |
| <div class="col-md-5"> |
| |
| <h1>Version: 2.0.0</h1> |
| |
| </div> |
| <div class="col-md-2"> |
| <a href="/downloads.html" class="btn-std btn-block btn-download">Download</a> |
| </div> |
| </div> |
| </div> |
| </header> |
| <!--Header End--> |
| <!--Navigation Begin--> |
| <div class="navbar" role="banner"> |
| <div class="container-fluid"> |
| <div class="navbar-header"> |
| <button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse"> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| </div> |
| <nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation"> |
| <ul class="nav navbar-nav"> |
| <li><a href="/index.html" id="home">Home</a></li> |
| <li><a href="/getting-help.html" id="getting-help">Getting Help</a></li> |
| <li><a href="/about/integrates.html" id="project-info">Project Information</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| |
| |
| <li><a href="/releases/2.0.0/index.html">2.0.0</a></li> |
| |
| |
| |
| <li><a href="/releases/1.2.3/index.html">1.2.3</a></li> |
| |
| |
| </ul> |
| </li> |
| <li><a href="/talksAndVideos.html">Talks and Slideshows</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li> |
| <li><a href="/contribute/People.html">People</a></li> |
| <li><a href="/contribute/BYLAWS.html">ByLaws</a></li> |
| </ul> |
| </li> |
| <li><a href="/2019/07/18/storm123-released.html" id="news">News</a></li> |
| </ul> |
| </nav> |
| </div> |
| </div> |
| |
| |
| |
| <div class="container-fluid"> |
| <h1 class="page-title">Joining Streams in Storm Core</h1> |
| <div class="row"> |
| <div class="col-md-12"> |
| <!-- Documentation --> |
| |
| <p class="post-meta"></p> |
| |
| <div class="documentation-content"><p>Storm core supports joining multiple data streams into one with the help of <code>JoinBolt</code>. |
| <code>JoinBolt</code> is a Windowed bolt, i.e. it waits for the configured window duration to match up the |
| tuples among the streams being joined. This helps align the streams within a Window boundary.</p> |
| |
| <p>Each of <code>JoinBolt</code>'s incoming data streams must be Fields Grouped on a single field. A stream |
| should only be joined with the other streams using the field on which it has been FieldsGrouped.<br> |
| Knowing this will help understand the join syntax described below. </p> |
| |
| <h2 id="performing-joins">Performing Joins</h2> |
| |
| <p>Consider the following SQL join involving 4 tables:</p> |
| <div class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">select</span> <span class="n">userId</span><span class="p">,</span> <span class="n">key4</span><span class="p">,</span> <span class="n">key2</span><span class="p">,</span> <span class="n">key3</span> |
| <span class="k">from</span> <span class="n">table1</span> |
| <span class="k">inner</span> <span class="k">join</span> <span class="n">table2</span> <span class="k">on</span> <span class="n">table2</span><span class="p">.</span><span class="n">userId</span> <span class="o">=</span> <span class="n">table1</span><span class="p">.</span><span class="n">key1</span> |
| <span class="k">inner</span> <span class="k">join</span> <span class="n">table3</span> <span class="k">on</span> <span class="n">table3</span><span class="p">.</span><span class="n">key3</span> <span class="o">=</span> <span class="n">table2</span><span class="p">.</span><span class="n">userId</span> |
| <span class="k">left</span> <span class="k">join</span> <span class="n">table4</span> <span class="k">on</span> <span class="n">table4</span><span class="p">.</span><span class="n">key4</span> <span class="o">=</span> <span class="n">table3</span><span class="p">.</span><span class="n">key3</span> |
| </code></pre></div> |
| <p>Similar joins could be expressed on tuples generated by 4 spouts using <code>JoinBolt</code>:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">JoinBolt</span> <span class="n">jbolt</span> <span class="o">=</span> <span class="k">new</span> <span class="n">JoinBolt</span><span class="o">(</span><span class="s">"spout1"</span><span class="o">,</span> <span class="s">"key1"</span><span class="o">)</span> <span class="c1">// from spout1 </span> |
| <span class="o">.</span><span class="na">join</span> <span class="o">(</span><span class="s">"spout2"</span><span class="o">,</span> <span class="s">"userId"</span><span class="o">,</span> <span class="s">"spout1"</span><span class="o">)</span> <span class="c1">// inner join spout2 on spout2.userId = spout1.key1</span> |
| <span class="o">.</span><span class="na">join</span> <span class="o">(</span><span class="s">"spout3"</span><span class="o">,</span> <span class="s">"key3"</span><span class="o">,</span> <span class="s">"spout2"</span><span class="o">)</span> <span class="c1">// inner join spout3 on spout3.key3 = spout2.userId </span> |
| <span class="o">.</span><span class="na">leftJoin</span> <span class="o">(</span><span class="s">"spout4"</span><span class="o">,</span> <span class="s">"key4"</span><span class="o">,</span> <span class="s">"spout3"</span><span class="o">)</span> <span class="c1">// left join spout4 on spout4.key4 = spout3.key3</span> |
| <span class="o">.</span><span class="na">select</span> <span class="o">(</span><span class="s">"userId, key4, key2, spout3:key3"</span><span class="o">)</span> <span class="c1">// chose output fields</span> |
| <span class="o">.</span><span class="na">withTumblingWindow</span><span class="o">(</span> <span class="k">new</span> <span class="n">Duration</span><span class="o">(</span><span class="mi">10</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">)</span> <span class="o">)</span> <span class="o">;</span> |
| |
| <span class="n">topoBuilder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"joiner"</span><span class="o">,</span> <span class="n">jbolt</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">fieldsGrouping</span><span class="o">(</span><span class="s">"spout1"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"key1"</span><span class="o">)</span> <span class="o">)</span> |
| <span class="o">.</span><span class="na">fieldsGrouping</span><span class="o">(</span><span class="s">"spout2"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"userId"</span><span class="o">)</span> <span class="o">)</span> |
| <span class="o">.</span><span class="na">fieldsGrouping</span><span class="o">(</span><span class="s">"spout3"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"key3"</span><span class="o">)</span> <span class="o">)</span> |
| <span class="o">.</span><span class="na">fieldsGrouping</span><span class="o">(</span><span class="s">"spout4"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"key4"</span><span class="o">)</span> <span class="o">);</span> |
| </code></pre></div> |
| <p>The bolt constructor takes two arguments. The 1st argument introduces the data from <code>spout1</code> |
| to be the first stream and specifies that it will always use field <code>key1</code> when joining this with the others streams. |
| The name of the component specified must refer to the spout or bolt that is directly connected to the Join bolt. |
| Here data received from <code>spout1</code> must be fields grouped on <code>key1</code>. Similarly, each of the <code>leftJoin()</code> and <code>join()</code> method |
| calls introduce a new stream along with the field to use for the join. As seen in above example, the same FieldsGrouping |
| requirement applies to these streams as well. The 3rd argument to the join methods refers to another stream with which |
| to join.</p> |
| |
| <p>The <code>select()</code> method is used to specify the output fields. The argument to <code>select</code> is a comma separated list of fields. |
| Individual field names can be prefixed with a stream name to disambiguate between the same field name occurring in |
| multiple streams as follows: <code>.select("spout3:key3, spout4:key3")</code>. Nested tuple types are supported if the |
| nesting has been done using <code>Map</code>s. For example <code>outer.inner.innermost</code> refers to a field that is nested three levels |
| deep where <code>outer</code> and <code>inner</code> are of type <code>Map</code>. </p> |
| |
| <p>Stream name prefix is not allowed for the fields in any of the join() arguments, but nested fields are supported. </p> |
| |
| <p>The call to <code>withTumblingWindow()</code> above, configures the join window to be a 10 minute tumbling window. Since <code>JoinBolt</code> |
| is a Windowed Bolt, we can also use the <code>withWindow</code> method to configure it as a sliding window (see tips section below). </p> |
| |
| <h2 id="stream-names-and-join-order">Stream names and Join order</h2> |
| |
| <ul> |
| <li>Stream names must be introduced (in constructor or as 1st arg to various join methods) before being referred |
| to (in the 3rd argument of the join methods). Forward referencing of stream names, as shown below, is not allowed:</li> |
| </ul> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="k">new</span> <span class="n">JoinBolt</span><span class="o">(</span> <span class="s">"spout1"</span><span class="o">,</span> <span class="s">"key1"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">join</span> <span class="o">(</span> <span class="s">"spout2"</span><span class="o">,</span> <span class="s">"userId"</span><span class="o">,</span> <span class="s">"spout3"</span><span class="o">)</span> <span class="c1">//not allowed. spout3 not yet introduced</span> |
| <span class="o">.</span><span class="na">join</span> <span class="o">(</span> <span class="s">"spout3"</span><span class="o">,</span> <span class="s">"key3"</span><span class="o">,</span> <span class="s">"spout1"</span><span class="o">)</span> |
| </code></pre></div> |
| <ul> |
| <li>Internally, the joins will be performed in the order expressed by the user.</li> |
| </ul> |
| |
| <h2 id="joining-based-on-stream-names">Joining based on Stream names</h2> |
| |
| <p>For simplicity, Storm topologies often use the <code>default</code> stream. Topologies can also use named streams |
| instead of <code>default</code> streams. To support such topologies, <code>JoinBolt</code> can be configured to use stream |
| names, instead of source component (spout/bolt) names, via the constructor's first argument:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="k">new</span> <span class="n">JoinBolt</span><span class="o">(</span><span class="n">JoinBolt</span><span class="o">.</span><span class="na">Selector</span><span class="o">.</span><span class="na">STREAM</span><span class="o">,</span> <span class="s">"stream1"</span><span class="o">,</span> <span class="s">"key1"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="s">"stream2"</span><span class="o">,</span> <span class="s">"key2"</span><span class="o">)</span> |
| <span class="o">...</span> |
| </code></pre></div> |
| <p>The first argument <code>JoinBolt.Selector.STREAM</code> informs the bolt that <code>stream1/2/3/4</code> refer to named streams |
| (as opposed to names of upstream spouts/bolts).</p> |
| |
| <p>The below example joins two named streams from four spouts:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="k">new</span> <span class="n">JoinBolt</span><span class="o">(</span><span class="n">JoinBolt</span><span class="o">.</span><span class="na">Selector</span><span class="o">.</span><span class="na">STREAM</span><span class="o">,</span> <span class="s">"stream1"</span><span class="o">,</span> <span class="s">"key1"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">join</span> <span class="o">(</span><span class="s">"stream2"</span><span class="o">,</span> <span class="s">"userId"</span><span class="o">,</span> <span class="s">"stream1"</span> <span class="o">)</span> |
| <span class="o">.</span><span class="na">select</span> <span class="o">(</span><span class="s">"userId, key1, key2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withTumblingWindow</span><span class="o">(</span> <span class="k">new</span> <span class="n">Duration</span><span class="o">(</span><span class="mi">10</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">MINUTES</span><span class="o">)</span> <span class="o">)</span> <span class="o">;</span> |
| |
| <span class="n">topoBuilder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"joiner"</span><span class="o">,</span> <span class="n">jbolt</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">fieldsGrouping</span><span class="o">(</span><span class="s">"bolt1"</span><span class="o">,</span> <span class="s">"stream1"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"key1"</span><span class="o">)</span> <span class="o">)</span> |
| <span class="o">.</span><span class="na">fieldsGrouping</span><span class="o">(</span><span class="s">"bolt2"</span><span class="o">,</span> <span class="s">"stream1"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"key1"</span><span class="o">)</span> <span class="o">)</span> |
| <span class="o">.</span><span class="na">fieldsGrouping</span><span class="o">(</span><span class="s">"bolt3"</span><span class="o">,</span> <span class="s">"stream2"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"userId"</span><span class="o">)</span> <span class="o">)</span> |
| <span class="o">.</span><span class="na">fieldsGrouping</span><span class="o">(</span><span class="s">"bolt4"</span><span class="o">,</span> <span class="s">"stream1"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"key1"</span><span class="o">)</span> <span class="o">);</span> |
| </code></pre></div> |
| <p>In the above example, it is possible that <code>bolt1</code>, for example, is emitting other streams also. But the join bolt |
| is only subscribing to <code>stream1</code> & <code>stream2</code> from the different bolts. <code>stream1</code> from <code>bolt1</code>, <code>bolt2</code> and <code>bolt4</code> |
| is treated as a single stream and joined against <code>stream2</code> from <code>bolt3</code>.</p> |
| |
| <h2 id="limitations">Limitations:</h2> |
| |
| <ol> |
| <li><p>Currently only INNER and LEFT joins are supported. </p></li> |
| <li><p>Unlike SQL, which allows joining the same table on different keys to different tables, here the same one field must be used |
| on a stream. Fields Grouping ensures the right tuples are routed to the right instances of a Join Bolt. Consequently the |
| FieldsGrouping field must be same as the join field, for correct results. To perform joins on multiple fields, the fields |
| can be combined into one field and then sent to the Join bolt. </p></li> |
| </ol> |
| |
| <h2 id="tips">Tips:</h2> |
| |
| <ol> |
| <li><p>Joins can be CPU and memory intensive. The larger the data accumulated in the current window (proportional to window |
| length), the longer it takes to do the join. Having a short sliding interval (few seconds for example) triggers frequent |
| joins. Consequently performance can suffer if using large window lengths or small sliding intervals or both.</p></li> |
| <li><p>Duplication of joined records across windows is possible when using Sliding Windows. This is because the tuples continue to exist |
| across multiple windows when using Sliding Windows.</p></li> |
| <li><p>If message timeouts are enabled, ensure the timeout setting (topology.message.timeout.secs) is large enough to comfortably |
| accommodate the window size, plus the additional processing by other spouts and bolts.</p></li> |
| <li><p>Joining a window of two streams with M and N elements each, <em>in the worst case</em>, can produce MxN elements with every output tuple |
| anchored to one tuple from each input stream. This can mean a lot of output tuples from JoinBolt and even more ACKs for downstream bolts |
| to emit. This can place a substantial pressure on the messaging system and dramatically slowdown the topology if not careful. |
| To manage the load on the messaging subsystem, it is advisable to:</p> |
| |
| <ul> |
| <li>Increase the worker's heap (topology.worker.max.heap.size.mb).</li> |
| <li><strong>If</strong> ACKing is not necessary for your topology, disable ACKers (topology.acker.executors=0).</li> |
| <li>Disable event logger (topology.eventlogger.executors=0).</li> |
| <li>Turn of topology debugging (topology.debug=false).</li> |
| <li>Set topology.max.spout.pending to a value large enough to accommodate an estimated full window worth of tuples plus some more for headroom. |
| This helps mitigate the possibility of spouts emitting excessive tuples when messaging subsystem is experiencing excessive load. This situation |
| can occur when its value is set to null.</li> |
| <li>Lastly, keep the window size to the minimum value necessary for solving the problem at hand.</li> |
| </ul></li> |
| </ol> |
| </div> |
| |
| |
| </div> |
| </div> |
| </div> |
| <footer> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>Meetups</h5> |
| <ul class="latest-news"> |
| |
| <li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li> |
| |
| <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li> |
| |
| <li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li> |
| |
| <li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li> |
| |
| <li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li> |
| |
| <li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li> |
| |
| <!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> --> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>About Apache Storm</h5> |
| <p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>First Look</h5> |
| <ul class="footer-list"> |
| <li><a href="/releases/current/Rationale.html">Rationale</a></li> |
| <li><a href="/releases/current/Tutorial.html">Tutorial</a></li> |
| <li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li> |
| <li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>Documentation</h5> |
| <ul class="footer-list"> |
| <li><a href="/releases/current/index.html">Index</a></li> |
| <li><a href="/releases/current/javadocs/index.html">Javadoc</a></li> |
| <li><a href="/releases/current/FAQ.html">FAQ</a></li> |
| </ul> |
| </div> |
| </div> |
| </div> |
| <hr/> |
| <div class="row"> |
| <div class="col-md-12"> |
| <p align="center">Copyright © 2019 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved. |
| <br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation. |
| <br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p> |
| </div> |
| </div> |
| </div> |
| </footer> |
| <!--Footer End--> |
| <!-- Scroll to top --> |
| <span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span> |
| |
| </body> |
| |
| </html> |
| |