blob: d9f2bbfaf8421e21774f5f8b0c12e8eac36bed81 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copysecond ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.join.stream.bundle;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.streaming.api.bundle.CoBundleTrigger;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.runtime.join.stream.state.JoinStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.JoinMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.OnlyEqualityConditionMatchStateHandler;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.flink.table.dataformat.util.BaseRowUtil.ACCUMULATE_MSG;
import static org.apache.flink.table.dataformat.util.BaseRowUtil.RETRACT_MSG;
/**
* MiniBatch anti semi operator.
*/
public class MiniBatchAntiSemiJoinStreamOperator extends MiniBatchJoinStreamOperator {
private static final long serialVersionUID = 1L;
protected final JoinMatchStateHandler.Type leftMatchStateType;
protected final JoinMatchStateHandler.Type rightMatchStateType;
protected transient JoinMatchStateHandler leftMatchStateHandler;
protected transient JoinMatchStateHandler rightMatchStateHandler;
protected transient BaseRow leftSideNullRow;
protected transient BaseRow rightSideNullRow;
protected final Boolean isSemi;
protected final Boolean isEqual;
public MiniBatchAntiSemiJoinStreamOperator(
BaseRowTypeInfo leftType,
BaseRowTypeInfo rightType, GeneratedJoinConditionFunction condFuncCode,
KeySelector<BaseRow, BaseRow> leftKeySelector,
KeySelector<BaseRow, BaseRow> rightKeySelector,
GeneratedProjection leftPkProjectCode, GeneratedProjection rightPkProjectCode,
JoinStateHandler.Type leftJoinStateType,
JoinStateHandler.Type rightJoinStateType, long maxRetentionTime, long minRetentionTime,
JoinMatchStateHandler.Type leftMatchStateType,
JoinMatchStateHandler.Type rightMatchStateType,
Boolean leftIsAccRetract, Boolean rightIsAccRetract, Boolean isSemi, Boolean
isEqualJoin, boolean[] filterNullKeys, CoBundleTrigger<BaseRow, BaseRow> coBundleTrigger,
boolean finishBundleBeforeSnapshot) {
super(leftType, rightType, condFuncCode, leftKeySelector, rightKeySelector, leftPkProjectCode,
rightPkProjectCode, leftJoinStateType, rightJoinStateType, maxRetentionTime,
minRetentionTime, leftIsAccRetract, rightIsAccRetract, filterNullKeys, coBundleTrigger,
finishBundleBeforeSnapshot);
this.leftMatchStateType = leftMatchStateType;
this.rightMatchStateType = rightMatchStateType;
this.isSemi = isSemi;
this.isEqual = isEqualJoin;
}
@Override
public void open() throws Exception {
super.open();
this.leftSideNullRow = new GenericRow(leftType.getArity());
this.rightSideNullRow = new GenericRow(rightType.getArity());
LOG.info("Init MiniBatchAntiSemiJoinStreamOperator.");
LOG.info("leftJoinStateType {}, rightJoinStateType {}, leftMatchStateType {}, rightMatchStateType {}",
leftJoinStateType, rightJoinStateType, leftMatchStateType, rightMatchStateType);
LOG.info("isSemi: " + isSemi + ", isEqualJoin: " + isEqual);
}
@Override
protected void initAllStates() throws Exception {
super.initAllStates();
this.leftMatchStateHandler = createMatchStateHandler(
leftType, leftMatchStateType, leftKeyType, "LeftMatchHandler", leftPkProjectCode);
this.rightMatchStateHandler = createMatchStateHandler(
rightType, rightMatchStateType, rightKeyType, "RightMatchHandler", rightPkProjectCode);
}
@Override
public void onProcessingTime(InternalTimer<BaseRow, Byte> timer) throws Exception {
byte namespace = timer.getNamespace();
if (namespace == 1) {
//left
if (needToCleanupState(timer.getKey(), timer.getTimestamp(), leftTimerState)) {
leftMatchStateHandler.remove(timer.getKey());
leftStateHandler.remove(timer.getKey());
}
} else {
//right
if (needToCleanupState(timer.getKey(), timer.getTimestamp(), rightTimerState)) {
rightMatchStateHandler.remove(timer.getKey());
rightStateHandler.remove(timer.getKey());
}
}
}
@Override
public void processBundles(
Map<BaseRow, List<BaseRow>> left,
Map<BaseRow, List<BaseRow>> right,
Collector<BaseRow> out) throws Exception {
// process right
processSingleSideBundles(right, left, rightJoinStateType, leftJoinStateType, rightStateHandler,
leftStateHandler, rightMatchStateHandler, leftMatchStateHandler, rightTimerState,
false, true, true, out);
// process left
processSingleSideBundles(left, right, leftJoinStateType, rightJoinStateType, leftStateHandler,
rightStateHandler, leftMatchStateHandler, rightMatchStateHandler, leftTimerState,
true, true, true, out);
}
public void processSingleSideBundles(
Map<BaseRow, List<BaseRow>> inputSide,
Map<BaseRow, List<BaseRow>> otherSide,
JoinStateHandler.Type inputSideJoinStateType,
JoinStateHandler.Type otherSideJoinStateType,
JoinStateHandler inputSideStateHandler,
JoinStateHandler otherSideStateHandler,
JoinMatchStateHandler inputSideMatchStateHandler,
JoinMatchStateHandler otherSideMatchStateHandler,
KeyedValueState<BaseRow, Long> timerState,
boolean inputSideIsLeft, boolean inputIsOuter, boolean otherSideIsOuter, Collector<BaseRow> out)
throws Exception {
// batch get values for all keys if other side is valuestate
if (otherSideJoinStateType == JoinStateHandler.Type.JOIN_KEY_CONTAIN_PRIMARY_KEY ||
otherSideJoinStateType == JoinStateHandler.Type.COUNT_KEY_SIZE) {
Set<BaseRow> keySet = new HashSet<>();
for (BaseRow stateKey: inputSide.keySet()) {
keySet.add(stateKey);
}
otherSideStateHandler.batchGet(keySet);
}
Map<BaseRow, BaseRow> putMap = new HashMap<>();
Set<BaseRow> deleteSet = new HashSet<>();
Boolean isAccRetract = inputSideIsLeft ? leftIsAccRetract : rightIsAccRetract;
for (Map.Entry<BaseRow, List<BaseRow>> entry: inputSide.entrySet()) {
List<Tuple2<BaseRow, Long>> reducedList = reduceCurrentList(entry.getValue(),
inputSideStateHandler, isAccRetract);
joinCurrentList(
entry.getKey(),
reducedList,
inputSideStateHandler,
otherSideStateHandler,
inputSideMatchStateHandler,
otherSideMatchStateHandler,
inputSideIsLeft, inputIsOuter, otherSideIsOuter,
timerState,
stateCleaningEnabled);
if (inputSideJoinStateType == JoinStateHandler.Type.JOIN_KEY_CONTAIN_PRIMARY_KEY) {
// update putMap and deleteSet
Tuple2<BaseRow, Long> lastTuple = null;
for (Tuple2<BaseRow, Long> tuple2: reducedList) {
lastTuple = tuple2;
}
if (lastTuple != null) {
if (lastTuple.f1 < 0) {
deleteSet.add(entry.getKey());
} else {
putMap.put(entry.getKey(), lastTuple.f0);
}
}
}
}
// batch update input side data for all keys if input side is valuestate
if (inputSideJoinStateType == JoinStateHandler.Type.JOIN_KEY_CONTAIN_PRIMARY_KEY) {
inputSideStateHandler.putAll(putMap);
inputSideStateHandler.removeAll(deleteSet);
}
}
/**
* 1. batch update current side state
* 2. loop other side state
* 2.1 get other side old JoinCnt
* 2.2 loop list calc delta matchCnt
* 2.3 get other side new JoinCnt (old_joinCnt + matchCnt)
* 2.4 JoinCnt from 0 to n. retract null
* 2.5 loop list to output joined Row
* 2.6 JoinCnt from n to 0. output null
* 2.7 update other side joinCnt
* 3. loop list output null if matchCnt is 0
* 4. update current side joinCnt
*/
private void joinCurrentList(
BaseRow currentJoinKey,
List<Tuple2<BaseRow, Long>> inputList,
JoinStateHandler inputSideStateHandler,
JoinStateHandler otherSideStateHandler,
JoinMatchStateHandler inputSideMatchStateHandler,
JoinMatchStateHandler otherSideMatchStateHandler,
boolean inputIsLeft, boolean inputIsOuter, boolean otherSideIsOuter,
KeyedValueState<BaseRow, Long> timerState,
boolean cleaningBasedTimer) throws Exception {
inputSideStateHandler.setCurrentJoinKey(currentJoinKey);
long currentTime = internalTimerService.currentProcessingTime();
registerProcessingCleanupTimer(inputSideStateHandler.getCurrentJoinKey(), currentTime, inputIsLeft, timerState);
Iterator<Tuple3<BaseRow, Long, Long>> iterator = otherSideStateHandler.getRecordsFromCache(currentJoinKey);
long [] updateStatus;
boolean leftWontStoreData = false;
// Left need't store data if other state side not empty and join without non equal predicate.
if (isSemi && isEqual && !rightIsAccRetract && inputIsLeft && iterator.hasNext()) {
updateStatus = new long[inputList.size()];
leftWontStoreData = true;
} else {
updateStatus = inputSideStateHandler.batchUpdate(currentJoinKey, inputList, currentTime + maxRetentionTime);
}
long otherSideOldJoinCnt = 0;
long otherSideNewJoinCnt = 0;
long[] inputSideJoinCnt = new long[inputList.size()];
// for OnlyEqualityConditionMatchStateHandler, we get old join cnt once
if (otherSideMatchStateHandler instanceof OnlyEqualityConditionMatchStateHandler) {
otherSideMatchStateHandler.extractCurrentRowMatchJoinCount(currentJoinKey, null, 0);
}
while (!leftWontStoreData && iterator.hasNext()) {
Tuple3<BaseRow, Long, Long> tuple3 = iterator.next();
BaseRow matchRow = tuple3.f0;
long matchRowCount = tuple3.f1;
if (!(otherSideMatchStateHandler instanceof OnlyEqualityConditionMatchStateHandler)) {
otherSideMatchStateHandler.extractCurrentRowMatchJoinCount(currentJoinKey, matchRow, 0);
}
otherSideOldJoinCnt = otherSideMatchStateHandler.getCurrentRowMatchJoinCnt();
int idx = 0;
long matchCnt = 0;
for (Tuple2<BaseRow, Long> tuple2: inputList) {
if (inputIsLeft) {
if (applyCondition(tuple2.f0, matchRow,
inputSideStateHandler.getCurrentJoinKey())) {
matchCnt += tuple2.f1;
inputSideJoinCnt[idx] += matchRowCount;
}
} else {
if (applyCondition(matchRow, tuple2.f0,
inputSideStateHandler.getCurrentJoinKey())) {
matchCnt += tuple2.f1;
inputSideJoinCnt[idx] += matchRowCount;
}
}
idx++;
}
otherSideNewJoinCnt = otherSideOldJoinCnt + matchCnt;
if (!inputIsLeft && otherSideOldJoinCnt == 0 && otherSideNewJoinCnt > 0) {
if (isSemi) {
collectResult(matchRow, matchRowCount);
} else {
matchRow.setHeader(RETRACT_MSG);
collectResult(matchRow, matchRowCount);
}
}
if (!inputIsLeft && otherSideOldJoinCnt > 0 && otherSideNewJoinCnt <= 0) {
if (!isSemi) {
collectResult(matchRow, matchRowCount);
} else {
matchRow.setHeader(RETRACT_MSG);
collectResult(matchRow, matchRowCount);
}
}
matchRow.setHeader(ACCUMULATE_MSG);
if (!(otherSideMatchStateHandler instanceof OnlyEqualityConditionMatchStateHandler)) {
otherSideMatchStateHandler.updateRowMatchJoinCnt(currentJoinKey, matchRow, otherSideNewJoinCnt);
}
if (tuple3.f2 < currentTime && stateCleaningEnabled) {
//expire
iterator.remove();
otherSideMatchStateHandler.remove(currentJoinKey, matchRow);
}
}
// for OnlyEqualityConditionMatchStateHandler, we update new join cnt once
if (otherSideMatchStateHandler instanceof OnlyEqualityConditionMatchStateHandler &&
otherSideNewJoinCnt != otherSideOldJoinCnt) {
otherSideMatchStateHandler.updateRowMatchJoinCnt(currentJoinKey, null, otherSideNewJoinCnt);
}
if (inputIsLeft) {
int idx = 0;
Set<BaseRow> deleteSet = new HashSet<>();
Map<BaseRow, Long> addMap = new HashMap();
for (Tuple2<BaseRow, Long> tuple2: inputList) {
if (((inputSideJoinCnt[idx] == 0) ^ isSemi) || leftWontStoreData) {
byte header = tuple2.f1 < 0 ? RETRACT_MSG : ACCUMULATE_MSG;
tuple2.f0.setHeader(header);
collectResult(tuple2.f0, tuple2.f1);
}
tuple2.f0.setHeader(ACCUMULATE_MSG);
if (updateStatus[idx] == -1) {
deleteSet.add(tuple2.f0);
} else if (updateStatus[idx] == 1) {
addMap.put(tuple2.f0, inputSideJoinCnt[idx]);
}
idx++;
}
// batch update and delete
if (leftWontStoreData) {
deleteSet.clear();
addMap.clear();
inputList = new LinkedList<>();
}
inputSideMatchStateHandler.removeAll(currentJoinKey, deleteSet);
inputSideMatchStateHandler.addAll(currentJoinKey, addMap);
}
}
@Override
public TwoInputSelection firstInputSelection() {
return TwoInputSelection.ANY;
}
}