blob: 9c5e86859db09b1f6d93d4e72ffaaedd0fc556ba [file] [log] [blame]
// Copyright 2017 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.streamlet.impl.operators;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import com.twitter.heron.api.Pair;
import com.twitter.heron.api.bolt.OutputCollector;
import com.twitter.heron.api.topology.TopologyContext;
import com.twitter.heron.api.tuple.Tuple;
import com.twitter.heron.api.tuple.Values;
import com.twitter.heron.api.windowing.TupleWindow;
import com.twitter.heron.streamlet.JoinType;
import com.twitter.heron.streamlet.KeyValue;
import com.twitter.heron.streamlet.KeyedWindow;
import com.twitter.heron.streamlet.SerializableBiFunction;
import com.twitter.heron.streamlet.Window;
/**
* JoinOperator is the bolt that implements the join/leftJoin/innerJoin functionality.
* It embeds the logic of the type of join(outer, left, inner) which it takes in as
* a config parameter. Also taken as parameters are which source is left and right.
* This is needed for the semantics of outer/left/inner joins.
*/
public class JoinOperator<K, V1, V2, VR> extends StreamletWindowOperator {
private static final long serialVersionUID = 4875450390444745407L;
public static final String LEFT_COMPONENT_NAME = "_streamlet_joinbolt_left_component_name_";
public static final String RIGHT_COMPONENT_NAME = "_streamlet_joinbolt_right_component_name_";
private JoinType joinType;
// The source component that represent the left join component
private String leftComponent;
// The source component that represent the right join component
private String rightComponent;
// The user supplied join function
private SerializableBiFunction<? super V1, ? super V2, ? extends VR> joinFn;
private OutputCollector collector;
public JoinOperator(JoinType joinType, String leftComponent, String rightComponent,
SerializableBiFunction<? super V1, ? super V2, ? extends VR> joinFn) {
this.joinType = joinType;
this.leftComponent = leftComponent;
this.rightComponent = rightComponent;
this.joinFn = joinFn;
}
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
collector = outputCollector;
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> cfg = super.getComponentConfiguration();
cfg.put(LEFT_COMPONENT_NAME, leftComponent);
cfg.put(RIGHT_COMPONENT_NAME, rightComponent);
return cfg;
}
@SuppressWarnings("unchecked")
@Override
public void execute(TupleWindow inputWindow) {
Map<K, Pair<List<V1>, List<V2>>> joinMap = new HashMap<>();
for (Tuple tuple : inputWindow.get()) {
if (tuple.getSourceComponent().equals(leftComponent)) {
KeyValue<K, V1> tup = (KeyValue<K, V1>) tuple.getValue(0);
if (tup.getKey() != null) {
addMapLeft(joinMap, tup);
}
} else {
KeyValue<K, V2> tup = (KeyValue<K, V2>) tuple.getValue(0);
if (tup.getKey() != null) {
addMapRight(joinMap, tup);
}
}
}
evaluateJoinMap(joinMap, inputWindow);
}
private void evaluateJoinMap(Map<K, Pair<List<V1>, List<V2>>> joinMap, TupleWindow tupleWindow) {
for (K key : joinMap.keySet()) {
Pair<List<V1>, List<V2>> val = joinMap.get(key);
switch (joinType) {
case INNER:
if (!val.getFirst().isEmpty() && !val.getSecond().isEmpty()) {
innerJoinAndEmit(key, tupleWindow, val);
}
break;
case OUTER_LEFT:
if (!val.getFirst().isEmpty() && !val.getSecond().isEmpty()) {
innerJoinAndEmit(key, tupleWindow, val);
} else if (!val.getFirst().isEmpty()) {
outerLeftJoinAndEmit(key, tupleWindow, val);
}
break;
case OUTER_RIGHT:
if (!val.getFirst().isEmpty() && !val.getSecond().isEmpty()) {
innerJoinAndEmit(key, tupleWindow, val);
} else if (!val.getSecond().isEmpty()) {
outerRightJoinAndEmit(key, tupleWindow, val);
}
break;
case OUTER:
if (!val.getFirst().isEmpty() && !val.getSecond().isEmpty()) {
innerJoinAndEmit(key, tupleWindow, val);
} else if (!val.getSecond().isEmpty()) {
outerRightJoinAndEmit(key, tupleWindow, val);
} else if (!val.getFirst().isEmpty()) {
outerLeftJoinAndEmit(key, tupleWindow, val);
}
break;
default:
throw new RuntimeException("Unknown join type " + joinType.name());
}
}
}
private void addMapLeft(Map<K, Pair<List<V1>, List<V2>>> joinMap, KeyValue<K, V1> tup) {
if (!joinMap.containsKey(tup.getKey())) {
joinMap.put(tup.getKey(), Pair.of(new LinkedList<>(), new LinkedList<>()));
}
joinMap.get(tup.getKey()).getFirst().add(tup.getValue());
}
private void addMapRight(Map<K, Pair<List<V1>, List<V2>>> joinMap, KeyValue<K, V2> tup) {
if (!joinMap.containsKey(tup.getKey())) {
joinMap.put(tup.getKey(), Pair.of(new LinkedList<>(), new LinkedList<>()));
}
joinMap.get(tup.getKey()).getSecond().add(tup.getValue());
}
private KeyedWindow<K> getKeyedWindow(K key, TupleWindow tupleWindow) {
long startWindow;
long endWindow;
if (tupleWindow.getStartTimestamp() == null) {
startWindow = 0;
} else {
startWindow = tupleWindow.getStartTimestamp();
}
if (tupleWindow.getEndTimestamp() == null) {
endWindow = 0;
} else {
endWindow = tupleWindow.getEndTimestamp();
}
Window window = new Window(startWindow, endWindow, tupleWindow.get().size());
return new KeyedWindow<>(key, window);
}
private void innerJoinAndEmit(K key, TupleWindow tupleWindow, Pair<List<V1>, List<V2>> val) {
KeyedWindow<K> keyedWindow = getKeyedWindow(key, tupleWindow);
for (V1 val1 : val.getFirst()) {
for (V2 val2 : val.getSecond()) {
collector.emit(new Values(new KeyValue<>(keyedWindow,
joinFn.apply(val1, val2))));
}
}
}
private void outerLeftJoinAndEmit(K key, TupleWindow tupleWindow, Pair<List<V1>, List<V2>> val) {
KeyedWindow<K> keyedWindow = getKeyedWindow(key, tupleWindow);
for (V1 val1 : val.getFirst()) {
collector.emit(new Values(new KeyValue<>(keyedWindow,
joinFn.apply(val1, null))));
}
}
private void outerRightJoinAndEmit(K key, TupleWindow tupleWindow, Pair<List<V1>, List<V2>> val) {
KeyedWindow<K> keyedWindow = getKeyedWindow(key, tupleWindow);
for (V2 val2 : val.getSecond()) {
collector.emit(new Values(new KeyValue<>(keyedWindow,
joinFn.apply(null, val2))));
}
}
}