blob: 9efb7c68f17e2cb3c0ce170a76d899c0bca4df66 [file] [log] [blame] [view]
---
title: Joining Streams in Storm Core
layout: documentation
documentation: true
---
Storm core supports joining multiple data streams into one with the help of `JoinBolt`.
`JoinBolt` is a Windowed bolt, i.e. it waits for the configured window duration to match up the
tuples among the streams being joined. This helps align the streams within a Window boundary.
Each of `JoinBolt`'s incoming data streams must be Fields Grouped on a single field. A stream
should only be joined with the other streams using the field on which it has been FieldsGrouped.
Knowing this will help understand the join syntax described below.
## Performing Joins
Consider the following SQL join involving 4 tables:
```sql
select userId, key4, key2, key3
from table1
inner join table2 on table2.userId = table1.key1
inner join table3 on table3.key3 = table2.userId
left join table4 on table4.key4 = table3.key3
```
Similar joins could be expressed on tuples generated by 4 spouts using `JoinBolt`:
```java
JoinBolt jbolt = new JoinBolt("spout1", "key1") // from spout1
.join ("spout2", "userId", "spout1") // inner join spout2 on spout2.userId = spout1.key1
.join ("spout3", "key3", "spout2") // inner join spout3 on spout3.key3 = spout2.userId
.leftJoin ("spout4", "key4", "spout3") // left join spout4 on spout4.key4 = spout3.key3
.select ("userId, key4, key2, spout3:key3") // chose output fields
.withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ;
topoBuilder.setBolt("joiner", jbolt, 1)
.fieldsGrouping("spout1", new Fields("key1") )
.fieldsGrouping("spout2", new Fields("userId") )
.fieldsGrouping("spout3", new Fields("key3") )
.fieldsGrouping("spout4", new Fields("key4") );
```
The bolt constructor takes two arguments. The 1st argument introduces the data from `spout1`
to be the first stream and specifies that it will always use field `key1` when joining this with the others streams.
The name of the component specified must refer to the spout or bolt that is directly connected to the Join bolt.
Here data received from `spout1` must be fields grouped on `key1`. Similarly, each of the `leftJoin()` and `join()` method
calls introduce a new stream along with the field to use for the join. As seen in above example, the same FieldsGrouping
requirement applies to these streams as well. The 3rd argument to the join methods refers to another stream with which
to join.
The `select()` method is used to specify the output fields. The argument to `select` is a comma separated list of fields.
Individual field names can be prefixed with a stream name to disambiguate between the same field name occurring in
multiple streams as follows: `.select("spout3:key3, spout4:key3")`. Nested tuple types are supported if the
nesting has been done using `Map`s. For example `outer.inner.innermost` refers to a field that is nested three levels
deep where `outer` and `inner` are of type `Map`.
Stream name prefix is not allowed for the fields in any of the join() arguments, but nested fields are supported.
The call to `withTumblingWindow()` above, configures the join window to be a 10 minute tumbling window. Since `JoinBolt`
is a Windowed Bolt, we can also use the `withWindow` method to configure it as a sliding window (see tips section below).
## Stream names and Join order
* Stream names must be introduced (in constructor or as 1st arg to various join methods) before being referred
to (in the 3rd argument of the join methods). Forward referencing of stream names, as shown below, is not allowed:
```java
new JoinBolt( "spout1", "key1")
.join ( "spout2", "userId", "spout3") //not allowed. spout3 not yet introduced
.join ( "spout3", "key3", "spout1")
```
* Internally, the joins will be performed in the order expressed by the user.
## Joining based on Stream names
For simplicity, Storm topologies often use the `default` stream. Topologies can also use named streams
instead of `default` streams. To support such topologies, `JoinBolt` can be configured to use stream
names, instead of source component (spout/bolt) names, via the constructor's first argument:
```java
new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")
.join("stream2", "key2")
...
```
The first argument `JoinBolt.Selector.STREAM` informs the bolt that `stream1/2/3/4` refer to named streams
(as opposed to names of upstream spouts/bolts).
The below example joins two named streams from four spouts:
```java
new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")
.join ("stream2", "userId", "stream1" )
.select ("userId, key1, key2")
.withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ;
topoBuilder.setBolt("joiner", jbolt, 1)
.fieldsGrouping("bolt1", "stream1", new Fields("key1") )
.fieldsGrouping("bolt2", "stream1", new Fields("key1") )
.fieldsGrouping("bolt3", "stream2", new Fields("userId") )
.fieldsGrouping("bolt4", "stream1", new Fields("key1") );
```
In the above example, it is possible that `bolt1`, for example, is emitting other streams also. But the join bolt
is only subscribing to `stream1` & `stream2` from the different bolts. `stream1` from `bolt1`, `bolt2` and `bolt4`
is treated as a single stream and joined against `stream2` from `bolt3`.
## Limitations:
1. Currently only INNER and LEFT joins are supported.
2. Unlike SQL, which allows joining the same table on different keys to different tables, here the same one field must be used
on a stream. Fields Grouping ensures the right tuples are routed to the right instances of a Join Bolt. Consequently the
FieldsGrouping field must be same as the join field, for correct results. To perform joins on multiple fields, the fields
can be combined into one field and then sent to the Join bolt.
## Tips:
1. Joins can be CPU and memory intensive. The larger the data accumulated in the current window (proportional to window
length), the longer it takes to do the join. Having a short sliding interval (few seconds for example) triggers frequent
joins. Consequently performance can suffer if using large window lengths or small sliding intervals or both.
2. Duplication of joined records across windows is possible when using Sliding Windows. This is because the tuples continue to exist
across multiple windows when using Sliding Windows.
3. If message timeouts are enabled, ensure the timeout setting (topology.message.timeout.secs) is large enough to comfortably
accommodate the window size, plus the additional processing by other spouts and bolts.
4. Joining a window of two streams with M and N elements each, *in the worst case*, can produce MxN elements with every output tuple
anchored to one tuple from each input stream. This can mean a lot of output tuples from JoinBolt and even more ACKs for downstream bolts
to emit. This can place a substantial pressure on the messaging system and dramatically slowdown the topology if not careful.
To manage the load on the messaging subsystem, it is advisable to:
* Increase the worker's heap (topology.worker.max.heap.size.mb).
* **If** ACKing is not necessary for your topology, disable ACKers (topology.acker.executors=0).
* Disable event logger (topology.eventlogger.executors=0).
* Turn of topology debugging (topology.debug=false).
* Set topology.max.spout.pending to a value large enough to accommodate an estimated full window worth of tuples plus some more for headroom.
This helps mitigate the possibility of spouts emitting excessive tuples when messaging subsystem is experiencing excessive load. This situation
can occur when its value is set to null.
* Lastly, keep the window size to the minimum value necessary for solving the problem at hand.