* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.storm.bolt;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.TimestampExtractor;
import org.apache.storm.windowing.TupleWindow;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
public class JoinBolt extends BaseWindowedBolt {
private OutputCollector collector;
// Map[StreamName -> Map[Key -> List<Tuple>] ]
HashMap<String, HashMap<Object, ArrayList<Tuple> >> hashedInputs = new HashMap<>(); // holds remaining streams
// Map[StreamName -> JoinInfo]
protected LinkedHashMap<String, JoinInfo> joinCriteria = new LinkedHashMap<>();
protected FieldSelector[] outputFields; // specified via ... used in declaring Output fields
// protected String[] dotSeparatedOutputFieldNames; // fieldNames in x.y.z format w/o stream name, used for naming output fields
protected String outputStreamName;
// Use streamId, source component name OR field in tuple to distinguish incoming tuple streams
public enum Selector { STREAM, SOURCE }
protected final Selector selectorType;
* Calls JoinBolt(Selector.SOURCE, sourceId, fieldName)
* @param sourceId Id of source component (spout/bolt) from which this bolt is receiving data
* @param fieldName the field to use for joining the stream (x.y.z format)
public JoinBolt(String sourceId, String fieldName) {
this(Selector.SOURCE, sourceId, fieldName);
* Introduces the first stream to start the join with. Equivalent SQL ...
* select .... from srcOrStreamId ...
* @param type Specifies whether 'srcOrStreamId' refers to stream name/source component
* @param srcOrStreamId name of stream OR source component
* @param fieldName the field to use for joining the stream (x.y.z format)
public JoinBolt(Selector type, String srcOrStreamId, String fieldName) {
selectorType = type;
joinCriteria.put(srcOrStreamId, new JoinInfo( new FieldSelector( srcOrStreamId, fieldName) ) );
* Optional. Allows naming the output stream of this bolt. If not specified, the emits will happen on
* 'default' stream.
public JoinBolt withOutputStream(String streamName) {
this.outputStreamName = streamName;
return this;
* Performs inner Join with the newStream.
* SQL : from priorStream inner join newStream on newStream.field = priorStream.field1
* same as: new WindowedQueryBolt(priorStream,field1). join(newStream, field, priorStream);
* Note: priorStream must be previously joined.
* Valid ex: new WindowedQueryBolt(s1,k1). join(s2,k2, s1). join(s3,k3, s2);
* Invalid ex: new WindowedQueryBolt(s1,k1). join(s3,k3, s2). join(s2,k2, s1);
* @param newStream Either stream name or name of upstream component
* @param field the field on which to perform the join
public JoinBolt join(String newStream, String field, String priorStream) {
return joinCommon(newStream, field, priorStream, JoinType.INNER);
* Performs left Join with the newStream.
* SQL : from stream1 left join stream2 on stream2.field = stream1.field1
* same as: new WindowedQueryBolt(stream1, field1). leftJoin(stream2, field, stream1);
* Note: priorStream must be previously joined
* Valid ex: new WindowedQueryBolt(s1,k1). leftJoin(s2,k2, s1). leftJoin(s3,k3, s2);
* Invalid ex: new WindowedQueryBolt(s1,k1). leftJoin(s3,k3, s2). leftJoin(s2,k2, s1);
* @param newStream Either a name of a stream or an upstream component
* @param field the field on which to perform the join
public JoinBolt leftJoin(String newStream, String field, String priorStream) {
return joinCommon(newStream, field, priorStream, JoinType.LEFT);
private JoinBolt joinCommon(String newStream, String fieldDescriptor, String priorStream, JoinType joinType) {
if (hashedInputs.containsKey(newStream)) {
throw new IllegalArgumentException("'" + newStream + "' is already part of join. Cannot join with it more than once.");
hashedInputs.put(newStream, new HashMap<Object, ArrayList<Tuple>>());
JoinInfo joinInfo = joinCriteria.get(priorStream);
if( joinInfo==null )
throw new IllegalArgumentException("Stream '" + priorStream + "' was not previously declared");
FieldSelector field = new FieldSelector(newStream, fieldDescriptor);
joinCriteria.put(newStream, new JoinInfo(field, priorStream, joinInfo, joinType) );
return this;
* Specify projection fields. i.e. Specifies the fields to include in the output.
* e.g: .select("field1, stream2:field2, field3")
* Nested Key names are supported for nested types:
* e.g: .select("outerKey1.innerKey1, outerKey1.innerKey2, stream3:outerKey2.innerKey3)"
* Inner types (non leaf) must be Map<> in order to support nested lookup using this dot notation
* This selected fields implicitly declare the output fieldNames for the bolt based.
* @param commaSeparatedKeys
* @return
public JoinBolt select(String commaSeparatedKeys) {
String[] fieldNames = commaSeparatedKeys.split(",");
outputFields = new FieldSelector[fieldNames.length];
for (int i = 0; i < fieldNames.length; i++) {
outputFields[i] = new FieldSelector(fieldNames[i]);
return this;
public void declareOutputFields(OutputFieldsDeclarer declarer) {
String[] outputFieldNames = new String[outputFields.length];
for( int i=0; i<outputFields.length; ++i ) {
outputFieldNames[i] = outputFields[i].getOutputName() ;
if (outputStreamName!=null) {
declarer.declareStream(outputStreamName, new Fields(outputFieldNames));
} else {
declarer.declare(new Fields(outputFieldNames));
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
// initialize the hashedInputs data structure
int i=0;
for ( String stream : joinCriteria.keySet() ) {
if(i>0) {
hashedInputs.put(stream, new HashMap<Object, ArrayList<Tuple>>());
if(outputFields ==null) {
throw new IllegalArgumentException("Must specify output fields via .select() method.");
public void execute(TupleWindow inputWindow) {
// 1) Perform Join
List<Tuple> currentWindow = inputWindow.get();
JoinAccumulator joinResult = hashJoin(currentWindow);
// 2) Emit results
for (ResultRecord resultRecord : joinResult.getRecords()) {
ArrayList<Object> outputTuple = resultRecord.getOutputFields();
if ( outputStreamName==null )
// explicit anchoring emits to corresponding input tuples only, as default window anchoring will anchor them to all tuples in window
collector.emit( resultRecord.tupleList, outputTuple );
// explicitly anchor emits to corresponding input tuples only, as default window anchoring will anchor them to all tuples in window
collector.emit( outputStreamName, resultRecord.tupleList, outputTuple );
private void clearHashedInputs() {
for (HashMap<Object, ArrayList<Tuple>> mappings : hashedInputs.values()) {
protected JoinAccumulator hashJoin(List<Tuple> tuples) {
JoinAccumulator probe = new JoinAccumulator();
// 1) Build phase - Segregate tuples in the Window into streams.
// First stream's tuples go into probe, rest into HashMaps in hashedInputs
String firstStream = joinCriteria.keySet().iterator().next();
for (Tuple tuple : tuples) {
String streamId = getStreamSelector(tuple);
if ( ! streamId.equals(firstStream) ) {
Object field = getJoinField(streamId, tuple);
ArrayList<Tuple> recs = hashedInputs.get(streamId).get(field);
if(recs == null) {
recs = new ArrayList<Tuple>();
hashedInputs.get(streamId).put(field, recs);
} else {
ResultRecord probeRecord = new ResultRecord(tuple, joinCriteria.size() == 1);
probe.insert( probeRecord ); // first stream's data goes into the probe
// 2) Join the streams in order of streamJoinOrder
int i=0;
for (String streamName : joinCriteria.keySet() ) {
boolean finalJoin = (i==joinCriteria.size()-1);
if(i>0) {
probe = doJoin(probe, hashedInputs.get(streamName), joinCriteria.get(streamName), finalJoin);
return probe;
// Dispatches to the right join method (inner/left/right/outer) based on the joinInfo.joinType
protected JoinAccumulator doJoin(JoinAccumulator probe, HashMap<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) {
final JoinType joinType = joinInfo.getJoinType();
switch ( joinType ) {
case INNER:
return doInnerJoin(probe, buildInput, joinInfo, finalJoin);
case LEFT:
return doLeftJoin(probe, buildInput, joinInfo, finalJoin);
case RIGHT:
case OUTER:
throw new RuntimeException("Unsupported join type : " + );
// inner join - core implementation
protected JoinAccumulator doInnerJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) {
String[] probeKeyName = joinInfo.getOtherField();
JoinAccumulator result = new JoinAccumulator();
FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName);
for (ResultRecord rec : probe.getRecords()) {
Object probeKey = rec.getField(fieldSelector);
if (probeKey!=null) {
ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey);
if(matchingBuildRecs!=null) {
for (Tuple matchingRec : matchingBuildRecs) {
ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin);
return result;
// left join - core implementation
protected JoinAccumulator doLeftJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) {
String[] probeKeyName = joinInfo.getOtherField();
JoinAccumulator result = new JoinAccumulator();
FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName);
for (ResultRecord rec : probe.getRecords()) {
Object probeKey = rec.getField(fieldSelector);
if (probeKey!=null) {
ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey); // ok if its return null
if (matchingBuildRecs!=null && !matchingBuildRecs.isEmpty() ) {
for (Tuple matchingRec : matchingBuildRecs) {
ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin);
} else {
ResultRecord mergedRecord = new ResultRecord(rec, null, finalJoin);
return result;
// Identify the join field for the stream, and look it up in 'tuple'. field can be nested field: outerKey.innerKey
private Object getJoinField(String streamId, Tuple tuple) {
JoinInfo ji = joinCriteria.get(streamId);
if(ji==null) {
throw new RuntimeException("Join information for '" + streamId + "' not found. Check the join clauses.");
return lookupField(ji.getJoinField(), tuple);
// Returns either the source component name or the stream name for the tuple
private String getStreamSelector(Tuple ti) {
switch (selectorType) {
case STREAM:
return ti.getSourceStreamId();
case SOURCE:
return ti.getSourceComponent();
throw new RuntimeException(selectorType + " stream selector type not yet supported");
protected enum JoinType {INNER, LEFT, RIGHT, OUTER}
/** Describes how to join the other stream with the current stream */
protected static class JoinInfo implements Serializable {
final static long serialVersionUID = 1L;
private JoinType joinType; // nature of join
private FieldSelector field; // field for the current stream
private FieldSelector other; // field for the other (2nd) stream
public JoinInfo(FieldSelector field) {
this.joinType = null;
this.field = field;
this.other = null;
public JoinInfo(FieldSelector field, String otherStream, JoinInfo otherStreamJoinInfo, JoinType joinType) {
this.joinType = joinType;
this.field = field;
this.other = new FieldSelector(otherStream, otherStreamJoinInfo.field.getOutputName() );
public FieldSelector getJoinField() {
return field;
public String getOtherStream() {
return other.getStreamName();
public String[] getOtherField() {
return other.getField();
public JoinType getJoinType() {
return joinType;
} // class JoinInfo
// Join helper to concat fields to the record
protected class ResultRecord {
ArrayList<Tuple> tupleList = new ArrayList<>(); // contains one Tuple per Stream being joined
ArrayList<Object> outFields = null; // refs to fields that will be part of output fields
// 'generateOutputFields' enables us to avoid projection unless it is the final stream being joined
public ResultRecord(Tuple tuple, boolean generateOutputFields) {
if(generateOutputFields) {
outFields = doProjection(tupleList, outputFields);
public ResultRecord(ResultRecord lhs, Tuple rhs, boolean generateOutputFields) {
if(generateOutputFields) {
outFields = doProjection(tupleList, outputFields);
public ArrayList<Object> getOutputFields() {
return outFields;
// 'stream' cannot be null,
public Object getField(FieldSelector fieldSelector) {
for (Tuple tuple : tupleList) {
Object result = lookupField(fieldSelector, tuple);
if (result!=null)
return result;
return null;
protected class JoinAccumulator {
ArrayList<ResultRecord> records = new ArrayList<>();
public void insert(ResultRecord tuple) {
records.add( tuple );
public Collection<ResultRecord> getRecords() {
return records;
// Performs projection on the tuples based on 'projectionFields'
protected ArrayList<Object> doProjection(ArrayList<Tuple> tuples, FieldSelector[] projectionFields) {
ArrayList<Object> result = new ArrayList<>(projectionFields.length);
// Todo: optimize this computation... perhaps inner loop can be outside to avoid rescanning tuples
for ( int i = 0; i < projectionFields.length; i++ ) {
boolean missingField = true;
for ( Tuple tuple : tuples ) {
Object field = lookupField(projectionFields[i], tuple ) ;
if (field != null) {
if(missingField) { // add a null for missing fields (usually in case of outer joins)
return result;
protected static class FieldSelector implements Serializable {
String streamName; // can be null;
String[] field; // nested field "x.y.z" becomes => String["x","y","z"]
String outputName; // either "stream1:x.y.z" or "x.y.z" depending on whether stream name is present.
public FieldSelector(String fieldDescriptor) { // sample fieldDescriptor = "stream1:x.y.z"
int pos = fieldDescriptor.indexOf(':');
if (pos>0) { // stream name is specified
streamName = fieldDescriptor.substring(0,pos).trim();
outputName = fieldDescriptor.trim();
field = fieldDescriptor.substring(pos+1, fieldDescriptor.length()).split("\\.");
// stream name unspecified
streamName = null;
if(pos==0) {
outputName = fieldDescriptor.substring(1, fieldDescriptor.length() ).trim();
} else if (pos<0) {
outputName = fieldDescriptor.trim();
field = outputName.split("\\.");
* @param stream name of stream
* @param fieldDescriptor Simple fieldDescriptor like "x.y.z" and w/o a 'stream1:' stream qualifier.
public FieldSelector(String stream, String fieldDescriptor) {
if(fieldDescriptor.indexOf(":")>=0) {
throw new IllegalArgumentException("Not expecting stream qualifier ':' in '" + fieldDescriptor
+ "'. Stream name '" + stream + "' is implicit in this context");
this.streamName = stream;
public FieldSelector(String stream, String[] field) {
this( stream, Joiner.on(".").join(field) );
public String getStreamName() {
return streamName;
public String[] getField() {
return field;
public String getOutputName() {
return toString();
public String toString() {
return outputName;
// Extract the field from tuple. Field may be nested field (x.y.z)
protected Object lookupField(FieldSelector fieldSelector, Tuple tuple) {
// very stream name matches, it stream name was specified
if ( fieldSelector.streamName!=null &&
!fieldSelector.streamName.equalsIgnoreCase( getStreamSelector(tuple) ) ) {
return null;
Object curr = null;
for (int i=0; i < fieldSelector.field.length; i++) {
if (i==0) {
if (tuple.contains(fieldSelector.field[i]) )
curr = tuple.getValueByField(fieldSelector.field[i]);
return null;
} else {
curr = ((Map) curr).get(fieldSelector.field[i]);
if (curr==null)
return null;
return curr;
// Boilerplate overrides to cast result from base type to JoinBolt, so user doesn't have to
// down cast when invoking these methods
public JoinBolt withWindow(Count windowLength, Count slidingInterval) {
return (JoinBolt) super.withWindow(windowLength, slidingInterval);
public JoinBolt withWindow(Count windowLength, Duration slidingInterval) {
return (JoinBolt) super.withWindow(windowLength, slidingInterval);
public JoinBolt withWindow(Duration windowLength, Count slidingInterval) {
return (JoinBolt) super.withWindow(windowLength, slidingInterval);
public JoinBolt withWindow(Duration windowLength, Duration slidingInterval) {
return (JoinBolt) super.withWindow(windowLength, slidingInterval);
public JoinBolt withWindow(Count windowLength) {
return (JoinBolt) super.withWindow(windowLength);
public JoinBolt withWindow(Duration windowLength) {
return (JoinBolt) super.withWindow(windowLength);
public JoinBolt withTumblingWindow(Count count) {
return (JoinBolt) super.withTumblingWindow(count);
public JoinBolt withTumblingWindow(Duration duration) {
return (JoinBolt) super.withTumblingWindow(duration);
public JoinBolt withTimestampField(String fieldName) {
return (JoinBolt) super.withTimestampField(fieldName);
public JoinBolt withTimestampExtractor(TimestampExtractor timestampExtractor) {
return (JoinBolt) super.withTimestampExtractor(timestampExtractor);
public JoinBolt withLateTupleStream(String streamId) {
return (JoinBolt) super.withLateTupleStream(streamId);
public BaseWindowedBolt withLag(Duration duration) {
return (JoinBolt) super.withLag(duration);
public BaseWindowedBolt withWatermarkInterval(Duration interval) {
return (JoinBolt) super.withWatermarkInterval(interval);