| --- |
| title: Joining Streams in Storm Core |
| layout: documentation |
| documentation: true |
| --- |
| |
| Storm core supports joining multiple data streams into one with the help of `JoinBolt`. |
| `JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the |
| tuples among the streams being joined. This helps align the streams within a Window boundary. |
| |
| Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream |
| should only be joined with the other streams using the field on which it has been FieldsGrouped. |
| Knowing this will help understand the join syntax described below. |
| |
| ## Performing Joins |
| Consider the following SQL join involving 4 tables: |
| |
| ```sql |
| select userId, key4, key2, key3 |
| from table1 |
| inner join table2 on table2.userId = table1.key1 |
| inner join table3 on table3.key3 = table2.userId |
| left join table4 on table4.key4 = table3.key3 |
| ``` |
| |
| Similar joins could be expressed on tuples generated by 4 spouts using `JoinBolt`: |
| |
| ```java |
| JoinBolt jbolt = new JoinBolt("spout1", "key1") // from spout1 |
| .join ("spout2", "userId", "spout1") // inner join spout2 on spout2.userId = spout1.key1 |
| .join ("spout3", "key3", "spout2") // inner join spout3 on spout3.key3 = spout2.userId |
| .leftJoin ("spout4", "key4", "spout3") // left join spout4 on spout4.key4 = spout3.key3 |
| .select ("userId, key4, key2, spout3:key3") // chose output fields |
| .withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ; |
| |
| topoBuilder.setBolt("joiner", jbolt, 1) |
| .fieldsGrouping("spout1", new Fields("key1") ) |
| .fieldsGrouping("spout2", new Fields("userId") ) |
| .fieldsGrouping("spout3", new Fields("key3") ) |
| .fieldsGrouping("spout4", new Fields("key4") ); |
| ``` |
| |
| The bolt constructor takes two arguments. The 1st argument introduces the data from `spout1` |
| to be the first stream and specifies that it will always use field `key1` when joining this with the others streams. |
| The name of the component specified must refer to the spout or bolt that is directly connected to the Join bolt. |
| Here data received from `spout1` must be fields grouped on `key1`. Similarly, each of the `leftJoin()` and `join()` method |
| calls introduce a new stream along with the field to use for the join. As seen in above example, the same FieldsGrouping |
| requirement applies to these streams as well. The 3rd argument to the join methods refers to another stream with which |
| to join. |
| |
| The `select()` method is used to specify the output fields. The argument to `select` is a comma separated list of fields. |
| Individual field names can be prefixed with a stream name to disambiguate between the same field name occurring in |
| multiple streams as follows: `.select("spout3:key3, spout4:key3")`. Nested tuple types are supported if the |
| nesting has been done using `Map`s. For example `outer.inner.innermost` refers to a field that is nested three levels |
| deep where `outer` and `inner` are of type `Map`. |
| |
| Stream name prefix is not allowed for the fields in any of the join() arguments, but nested fields are supported. |
| |
| The call to `withTumblingWindow()` above, configures the join window to be a 10 minute tumbling window. Since `JoinBolt` |
| is a Windowed Bolt, we can also use the `withWindow` method to configure it as a sliding window (see tips section below). |
| |
| ## Stream names and Join order |
| * Stream names must be introduced (in constructor or as 1st arg to various join methods) before being referred |
| to (in the 3rd argument of the join methods). Forward referencing of stream names, as shown below, is not allowed: |
| |
| ```java |
| new JoinBolt( "spout1", "key1") |
| .join ( "spout2", "userId", "spout3") //not allowed. spout3 not yet introduced |
| .join ( "spout3", "key3", "spout1") |
| ``` |
| * Internally, the joins will be performed in the order expressed by the user. |
| |
| ## Joining based on Stream names |
| |
| For simplicity, Storm topologies often use the `default` stream. Topologies can also use named streams |
| instead of `default` streams. To support such topologies, `JoinBolt` can be configured to use stream |
| names, instead of source component (spout/bolt) names, via the constructor's first argument: |
| |
| ```java |
| new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1") |
| .join("stream2", "key2") |
| ... |
| ``` |
| The first argument `JoinBolt.Selector.STREAM` informs the bolt that `stream1/2/3/4` refer to named streams |
| (as opposed to names of upstream spouts/bolts). |
| |
| |
| The below example joins two named streams from four spouts: |
| |
| ```java |
| new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1") |
| .join ("stream2", "userId", "stream1" ) |
| .select ("userId, key1, key2") |
| .withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ; |
| |
| topoBuilder.setBolt("joiner", jbolt, 1) |
| .fieldsGrouping("bolt1", "stream1", new Fields("key1") ) |
| .fieldsGrouping("bolt2", "stream1", new Fields("key1") ) |
| .fieldsGrouping("bolt3", "stream2", new Fields("userId") ) |
| .fieldsGrouping("bolt4", "stream1", new Fields("key1") ); |
| ``` |
| |
| In the above example, it is possible that `bolt1`, for example, is emitting other streams also. But the join bolt |
| is only subscribing to `stream1` & `stream2` from the different bolts. `stream1` from `bolt1`, `bolt2` and `bolt4` |
| is treated as a single stream and joined against `stream2` from `bolt3`. |
| |
| ## Limitations: |
| 1. Currently only INNER and LEFT joins are supported. |
| |
| 2. Unlike SQL, which allows joining the same table on different keys to different tables, here the same one field must be used |
| on a stream. Fields Grouping ensures the right tuples are routed to the right instances of a Join Bolt. Consequently the |
| FieldsGrouping field must be same as the join field, for correct results. To perform joins on multiple fields, the fields |
| can be combined into one field and then sent to the Join bolt. |
| |
| |
| ## Tips: |
| |
| 1. Joins can be CPU and memory intensive. The larger the data accumulated in the current window (proportional to window |
| length), the longer it takes to do the join. Having a short sliding interval (few seconds for example) triggers frequent |
| joins. Consequently performance can suffer if using large window lengths or small sliding intervals or both. |
| |
| 2. Duplication of joined records across windows is possible when using Sliding Windows. This is because the tuples continue to exist |
| across multiple windows when using Sliding Windows. |
| |
| 3. If message timeouts are enabled, ensure the timeout setting (topology.message.timeout.secs) is large enough to comfortably |
| accommodate the window size, plus the additional processing by other spouts and bolts. |
| |
| 4. Joining a window of two streams with M and N elements each, *in the worst case*, can produce MxN elements with every output tuple |
| anchored to one tuple from each input stream. This can mean a lot of output tuples from JoinBolt and even more ACKs for downstream bolts |
| to emit. This can place a substantial pressure on the messaging system and dramatically slowdown the topology if not careful. |
| To manage the load on the messaging subsystem, it is advisable to: |
| * Increase the worker's heap (topology.worker.max.heap.size.mb). |
| * **If** ACKing is not necessary for your topology, disable ACKers (topology.acker.executors=0). |
| * Disable event logger (topology.eventlogger.executors=0). |
| * Turn of topology debugging (topology.debug=false). |
| * Set topology.max.spout.pending to a value large enough to accommodate an estimated full window worth of tuples plus some more for headroom. |
| This helps mitigate the possibility of spouts emitting excessive tuples when messaging subsystem is experiencing excessive load. This situation |
| can occur when its value is set to null. |
| * Lastly, keep the window size to the minimum value necessary for solving the problem at hand. |