Storm core supports joining multiple data streams into one with the help of
JoinBolt is a Windowed bolt, i.e. it waits for the configured window duration to match up the tuples among the streams being joined. This helps align the streams within a Window boundary.
JoinBolt's incoming data streams must be Fields Grouped on a single field. A stream should only be joined with the other streams using the field on which it has been FieldsGrouped.
Knowing this will help understand the join syntax described below.
Consider the following SQL join involving 4 tables:
select userId, key4, key2, key3 from table1 inner join table2 on table2.userId = table1.key1 inner join table3 on table3.key3 = table2.userId left join table4 on table4.key4 = table3.key3
Similar joins could be expressed on tuples generated by 4 spouts using
JoinBolt jbolt = new JoinBolt("spout1", "key1") // from spout1 .join ("spout2", "userId", "spout1") // inner join spout2 on spout2.userId = spout1.key1 .join ("spout3", "key3", "spout2") // inner join spout3 on spout3.key3 = spout2.userId .leftJoin ("spout4", "key4", "spout3") // left join spout4 on spout4.key4 = spout3.key3 .select ("userId, key4, key2, spout3:key3") // chose output fields .withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ; topoBuilder.setBolt("joiner", jbolt, 1) .fieldsGrouping("spout1", new Fields("key1") ) .fieldsGrouping("spout2", new Fields("userId") ) .fieldsGrouping("spout3", new Fields("key3") ) .fieldsGrouping("spout4", new Fields("key4") );
The bolt constructor takes two arguments. The 1st argument introduces the data from
spout1 to be the first stream and specifies that it will always use field
key1 when joining this with the others streams. The name of the component specified must refer to the spout or bolt that is directly connected to the Join bolt. Here data received from
spout1 must be fields grouped on
key1. Similarly, each of the
join() method calls introduce a new stream along with the field to use for the join. As seen in above example, the same FieldsGrouping requirement applies to these streams as well. The 3rd argument to the join methods refers to another stream with which to join.
select() method is used to specify the output fields. The argument to
select is a comma separated list of fields. Individual field names can be prefixed with a stream name to disambiguate between the same field name occurring in multiple streams as follows:
.select("spout3:key3, spout4:key3"). Nested tuple types are supported if the nesting has been done using
Maps. For example
outer.inner.innermost refers to a field that is nested three levels deep where
inner are of type
Stream name prefix is not allowed for the fields in any of the join() arguments, but nested fields are supported.
The call to
withTumblingWindow() above, configures the join window to be a 10 minute tumbling window. Since
JoinBolt is a Windowed Bolt, we can also use the
withWindow method to configure it as a sliding window (see tips section below).
new JoinBolt( "spout1", "key1") .join ( "spout2", "userId", "spout3") //not allowed. spout3 not yet introduced .join ( "spout3", "key3", "spout1")
For simplicity, Storm topologies often use the
default stream. Topologies can also use named streams instead of
default streams. To support such topologies,
JoinBolt can be configured to use stream names, instead of source component (spout/bolt) names, via the constructor's first argument:
new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1") .join("stream2", "key2") ...
The first argument
JoinBolt.Selector.STREAM informs the bolt that
stream1/2/3/4 refer to named streams (as opposed to names of upstream spouts/bolts).
The below example joins two named streams from four spouts:
new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1") .join ("stream2", "userId", "stream1" ) .select ("userId, key1, key2") .withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ; topoBuilder.setBolt("joiner", jbolt, 1) .fieldsGrouping("bolt1", "stream1", new Fields("key1") ) .fieldsGrouping("bolt2", "stream1", new Fields("key1") ) .fieldsGrouping("bolt3", "stream2", new Fields("userId") ) .fieldsGrouping("bolt4", "stream1", new Fields("key1") );
In the above example, it is possible that
bolt1, for example, is emitting other streams also. But the join bolt is only subscribing to
stream2 from the different bolts.
bolt4 is treated as a single stream and joined against
Currently only INNER and LEFT joins are supported.
Unlike SQL, which allows joining the same table on different keys to different tables, here the same one field must be used on a stream. Fields Grouping ensures the right tuples are routed to the right instances of a Join Bolt. Consequently the FieldsGrouping field must be same as the join field, for correct results. To perform joins on multiple fields, the fields can be combined into one field and then sent to the Join bolt.
Joins can be CPU and memory intensive. The larger the data accumulated in the current window (proportional to window length), the longer it takes to do the join. Having a short sliding interval (few seconds for example) triggers frequent joins. Consequently performance can suffer if using large window lengths or small sliding intervals or both.
Duplication of joined records across windows is possible when using Sliding Windows. This is because the tuples continue to exist across multiple windows when using Sliding Windows.
If message timeouts are enabled, ensure the timeout setting (topology.message.timeout.secs) is large enough to comfortably accommodate the window size, plus the additional processing by other spouts and bolts.
Joining a window of two streams with M and N elements each, in the worst case, can produce MxN elements with every output tuple anchored to one tuple from each input stream. This can mean a lot of output tuples from JoinBolt and even more ACKs for downstream bolts to emit. This can place a substantial pressure on the messaging system and dramatically slowdown the topology if not careful. To manage the load on the messaging subsystem, it is advisable to: