Storm core supports joining multiple data streams into one with the help of JoinBolt
. JoinBolt
is a Windowed bolt, i.e. it waits for the configured window duration to match up the tuples among the streams being joined. This helps align the streams within a Window boundary.
Each of JoinBolt
's incoming data streams must be Fields Grouped on a single field. A stream should only be joined with the other streams using the field on which it has been FieldsGrouped.
Knowing this will help understand the join syntax described below.
Consider the following SQL join involving 4 tables:
select userId, key4, key2, key3 from table1 inner join table2 on table2.userId = table1.key1 inner join table3 on table3.key3 = table2.userId left join table4 on table4.key4 = table3.key3
Similar joins could be expressed on tuples generated by 4 spouts using JoinBolt
:
JoinBolt jbolt = new JoinBolt("spout1", "key1") // from spout1 .join ("spout2", "userId", "spout1") // inner join spout2 on spout2.userId = spout1.key1 .join ("spout3", "key3", "spout2") // inner join spout3 on spout3.key3 = spout2.userId .leftJoin ("spout4", "key4", "spout3") // left join spout4 on spout4.key4 = spout3.key3 .select ("userId, key4, key2, spout3:key3") // chose output fields .withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ; topoBuilder.setBolt("joiner", jbolt, 1) .fieldsGrouping("spout1", new Fields("key1") ) .fieldsGrouping("spout2", new Fields("userId") ) .fieldsGrouping("spout3", new Fields("key3") ) .fieldsGrouping("spout4", new Fields("key4") );
The bolt constructor takes two arguments. The 1st argument introduces the data from spout1
to be the first stream and specifies that it will always use field key1
when joining this with the others streams. The name of the component specified must refer to the spout or bolt that is directly connected to the Join bolt. Here data received from spout1
must be fields grouped on key1
. Similarly, each of the leftJoin()
and join()
method calls introduce a new stream along with the field to use for the join. As seen in above example, the same FieldsGrouping requirement applies to these streams as well. The 3rd argument to the join methods refers to another stream with which to join.
The select()
method is used to specify the output fields. The argument to select
is a comma separated list of fields. Individual field names can be prefixed with a stream name to disambiguate between the same field name occurring in multiple streams as follows: .select("spout3:key3, spout4:key3")
. Nested tuple types are supported if the nesting has been done using Map
s. For example outer.inner.innermost
refers to a field that is nested three levels deep where outer
and inner
are of type Map
.
Stream name prefix is not allowed for the fields in any of the join() arguments, but nested fields are supported.
The call to withTumblingWindow()
above, configures the join window to be a 10 minute tumbling window. Since JoinBolt
is a Windowed Bolt, we can also use the withWindow
method to configure it as a sliding window (see tips section below).
new JoinBolt( "spout1", "key1") .join ( "spout2", "userId", "spout3") //not allowed. spout3 not yet introduced .join ( "spout3", "key3", "spout1")
For simplicity, Storm topologies often use the default
stream. Topologies can also use named streams instead of default
streams. To support such topologies, JoinBolt
can be configured to use stream names, instead of source component (spout/bolt) names, via the constructor's first argument:
new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1") .join("stream2", "key2") ...
The first argument JoinBolt.Selector.STREAM
informs the bolt that stream1/2/3/4
refer to named streams (as opposed to names of upstream spouts/bolts).
The below example joins two named streams from four spouts:
new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1") .join ("stream2", "userId", "stream1" ) .select ("userId, key1, key2") .withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ; topoBuilder.setBolt("joiner", jbolt, 1) .fieldsGrouping("bolt1", "stream1", new Fields("key1") ) .fieldsGrouping("bolt2", "stream1", new Fields("key1") ) .fieldsGrouping("bolt3", "stream2", new Fields("userId") ) .fieldsGrouping("bolt4", "stream1", new Fields("key1") );
In the above example, it is possible that bolt1
, for example, is emitting other streams also. But the join bolt is only subscribing to stream1
& stream2
from the different bolts. stream1
from bolt1
, bolt2
and bolt4
is treated as a single stream and joined against stream2
from bolt3
.
Currently only INNER and LEFT joins are supported.
Unlike SQL, which allows joining the same table on different keys to different tables, here the same one field must be used on a stream. Fields Grouping ensures the right tuples are routed to the right instances of a Join Bolt. Consequently the FieldsGrouping field must be same as the join field, for correct results. To perform joins on multiple fields, the fields can be combined into one field and then sent to the Join bolt.
Joins can be CPU and memory intensive. The larger the data accumulated in the current window (proportional to window length), the longer it takes to do the join. Having a short sliding interval (few seconds for example) triggers frequent joins. Consequently performance can suffer if using large window lengths or small sliding intervals or both.
Duplication of joined records across windows is possible when using Sliding Windows. This is because the tuples continue to exist across multiple windows when using Sliding Windows.
If message timeouts are enabled, ensure the timeout setting (topology.message.timeout.secs) is large enough to comfortably accommodate the window size, plus the additional processing by other spouts and bolts.
Joining a window of two streams with M and N elements each, in the worst case, can produce MxN elements with every output tuple anchored to one tuple from each input stream. This can mean a lot of output tuples from JoinBolt and even more ACKs for downstream bolts to emit. This can place a substantial pressure on the messaging system and dramatically slowdown the topology if not careful. To manage the load on the messaging subsystem, it is advisable to: