blob: a03f64e1f9908d31b6314e13ab623fa30c912491 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copysecond ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.join.stream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.join.stream.state.JoinStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.JoinMatchStateHandler;
import org.apache.flink.table.runtime.join.stream.state.match.NonBatchOnlyEqualityConditionMatchStateHandler;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;
import java.util.Iterator;
import static org.apache.flink.table.dataformat.util.BaseRowUtil.ACCUMULATE_MSG;
import static org.apache.flink.table.dataformat.util.BaseRowUtil.RETRACT_MSG;
/**
* Base operator for outer join based on hash.
*/
@Internal
abstract class OuterJoinStreamOperator extends JoinStreamOperator {
private static final long serialVersionUID = 1L;
protected final JoinMatchStateHandler.Type leftMatchStateType;
protected final JoinMatchStateHandler.Type rightMatchStateType;
protected transient JoinMatchStateHandler leftMatchStateHandler;
protected transient JoinMatchStateHandler rightMatchStateHandler;
protected transient BaseRow leftSideNullRow;
protected transient BaseRow rightSideNullRow;
public OuterJoinStreamOperator(
BaseRowTypeInfo leftType,
BaseRowTypeInfo rightType, GeneratedJoinConditionFunction condFuncCode,
KeySelector<BaseRow, BaseRow> leftKeySelector,
KeySelector<BaseRow, BaseRow> rightKeySelector,
GeneratedProjection leftPkProjectCode, GeneratedProjection rightPkProjectCode,
JoinStateHandler.Type leftJoinStateType,
JoinStateHandler.Type rightJoinStateType, long maxRetentionTime, long minRetentionTime,
JoinMatchStateHandler.Type leftMatchStateType,
JoinMatchStateHandler.Type rightMatchStateType, boolean[] filterNullKeys) {
super(leftType, rightType, condFuncCode, leftKeySelector, rightKeySelector, leftPkProjectCode,
rightPkProjectCode, leftJoinStateType, rightJoinStateType, maxRetentionTime, minRetentionTime,
filterNullKeys);
this.leftMatchStateType = leftMatchStateType;
this.rightMatchStateType = rightMatchStateType;
}
@Override
public void open() throws Exception {
super.open();
this.leftSideNullRow = new GenericRow(leftType.getArity());
this.rightSideNullRow = new GenericRow(rightType.getArity());
LOG.info("leftJoinStateType {}, rightJoinStateType {}, leftMatchStateType {}, rightMatchStateType {}",
leftJoinStateType, rightJoinStateType, leftMatchStateType, rightMatchStateType);
}
@Override
protected void initAllStates() throws Exception {
super.initAllStates();
this.leftMatchStateHandler = createMatchStateHandler(
leftType, leftMatchStateType, leftKeyType, "LeftMatchHandler", leftPkProjectCode);
this.rightMatchStateHandler = createMatchStateHandler(
rightType, rightMatchStateType, rightKeyType, "RightMatchHandler", rightPkProjectCode);
}
@Override
public void onProcessingTime(InternalTimer<BaseRow, Byte> timer) throws Exception {
byte namespace = timer.getNamespace();
if (namespace == 1) {
//left
if (needToCleanupState(timer.getKey(), timer.getTimestamp(), leftTimerState)) {
leftMatchStateHandler.remove(timer.getKey());
leftStateHandler.remove(timer.getKey());
}
} else {
//right
if (needToCleanupState(timer.getKey(), timer.getTimestamp(), rightTimerState)) {
rightMatchStateHandler.remove(timer.getKey());
rightStateHandler.remove(timer.getKey());
}
}
}
protected TwoInputSelection processElement(BaseRow input, JoinStateHandler inputSideStateHandler,
JoinStateHandler otherSideStateHandler, JoinMatchStateHandler inputSideMatchStateHandler,
JoinMatchStateHandler otherSideMatchStateHandler,
boolean inputIsLeft, boolean inputIsOuter, boolean otherSideIsOuter,
KeyedValueState<BaseRow, Long> timerState) throws Exception {
long currentTime = internalTimerService.currentProcessingTime();
inputSideStateHandler.extractCurrentJoinKey(input);
inputSideStateHandler.extractCurrentPrimaryKey(input);
long possibleJoinCnt = getOtherSidePossibleMatchJoinCnt(otherSideMatchStateHandler, inputSideStateHandler,
inputSideStateHandler.getCurrentJoinKey());
long inputCount;
if (BaseRowUtil.isRetractMsg(input)) {
//retract
inputCount = inputSideStateHandler.retract(input);
} else {
//register timer
registerProcessingCleanupTimer(inputSideStateHandler.getCurrentJoinKey(), currentTime, inputIsLeft,
timerState);
//update
inputCount = inputSideStateHandler.add(input, currentTime + maxRetentionTime);
}
byte reservedHeader = input.getHeader();
joinedRow.setHeader(reservedHeader);
Iterator<Tuple3<BaseRow, Long, Long>> iterator = otherSideStateHandler.getRecords(
inputSideStateHandler.getCurrentJoinKey());
long inputJoinOtherSideRowNum = 0;
while (iterator.hasNext()) {
Tuple3<BaseRow, Long, Long> tuple3 = iterator.next();
BaseRow matchRow = tuple3.f0;
long matchRowCount = tuple3.f1;
boolean isMatched = false;
if (inputIsLeft) {
if (applyCondition(input, matchRow,
inputSideStateHandler.getCurrentJoinKey())) {
isMatched = true;
}
} else {
if (applyCondition(matchRow, input,
inputSideStateHandler.getCurrentJoinKey())) {
isMatched = true;
}
}
if (isMatched) {
inputJoinOtherSideRowNum += matchRowCount;
collectJoinResult(input, inputSideStateHandler, matchRow, matchRowCount,
inputIsLeft, otherSideIsOuter, otherSideMatchStateHandler, possibleJoinCnt);
}
if (tuple3.f2 < currentTime && stateCleaningEnabled) {
//expire
iterator.remove();
otherSideMatchStateHandler.remove(inputSideStateHandler.getCurrentJoinKey(), matchRow);
}
}
if (inputJoinOtherSideRowNum == 0 && inputIsOuter) {
//not match
if (inputIsLeft) {
collectResult(joinedRow.replace(input, rightSideNullRow));
} else {
collectResult(joinedRow.replace(leftSideNullRow, input));
}
}
if (inputCount <= 0) {
//remove
input.setHeader(ACCUMULATE_MSG);
inputSideMatchStateHandler.remove(inputSideStateHandler.getCurrentJoinKey(), input);
input.setHeader(reservedHeader);
} else if (inputCount == 1 && BaseRowUtil.isAccumulateMsg(input)) {
//input appear firstly
inputSideMatchStateHandler.updateRowMatchJoinCnt(
inputSideStateHandler.getCurrentJoinKey(), input, inputJoinOtherSideRowNum);
}
return TwoInputSelection.ANY;
}
private void collectJoinResult(
BaseRow input, JoinStateHandler inputSideStateHandler, BaseRow matchedRow, long matchedRowCount, boolean
inputIsLeft, boolean matchSideIsOuter, JoinMatchStateHandler matchSideMatchStateHandler,
long possibleJoinCnt) {
byte reservedHeader = input.getHeader();
BaseRow currentJoinKey = inputSideStateHandler.getCurrentJoinKey();
matchSideMatchStateHandler.extractCurrentRowMatchJoinCount(currentJoinKey, matchedRow, possibleJoinCnt);
if (matchSideIsOuter) {
//produce retraction here.
if (matchSideMatchStateHandler.getCurrentRowMatchJoinCnt() == 0 &&
BaseRowUtil.isAccumulateMsg(input)) {
//retract join result with null
joinedRow.setHeader(RETRACT_MSG);
for (int i = 0; i < matchedRowCount; i++) {
if (inputIsLeft) {
collectResult(joinedRow.replace(leftSideNullRow, matchedRow));
} else {
collectResult(joinedRow.replace(matchedRow, rightSideNullRow));
}
}
joinedRow.setHeader(reservedHeader);
}
}
long appendJoinCnt = BaseRowUtil.isRetractMsg(input) ? -1L : 1L;
long updateJoinCnt = matchSideMatchStateHandler.getCurrentRowMatchJoinCnt() + appendJoinCnt;
matchSideMatchStateHandler.resetCurrentRowMatchJoinCnt(updateJoinCnt);
for (int i = 0; i < matchedRowCount; i++) {
if (inputIsLeft) {
collectResult(joinedRow.replace(input, matchedRow));
} else {
collectResult(joinedRow.replace(matchedRow, input));
}
}
if (matchSideIsOuter) {
if (matchSideMatchStateHandler.getCurrentRowMatchJoinCnt() == 0 && BaseRowUtil.isRetractMsg(input)) {
//send join result with null
joinedRow.setHeader(ACCUMULATE_MSG);
for (int i = 0; i < matchedRowCount; i++) {
if (inputIsLeft) {
collectResult(joinedRow.replace(leftSideNullRow, matchedRow));
} else {
collectResult(joinedRow.replace(matchedRow, rightSideNullRow));
}
}
joinedRow.setHeader(reservedHeader);
}
}
}
private long getOtherSidePossibleMatchJoinCnt(JoinMatchStateHandler otherSideStateHandler, JoinStateHandler
inputSideStateHandler, BaseRow joinKey) {
if (otherSideStateHandler instanceof NonBatchOnlyEqualityConditionMatchStateHandler) {
//calculate the matchJoinCount based the otherSideStateHandler.
Iterator<Tuple3<BaseRow, Long, Long>> iterator = inputSideStateHandler.getRecords(joinKey);
long matchJoinCount = 0;
while (iterator.hasNext() && (matchJoinCount < 2)) {
Tuple3<BaseRow, Long, Long> tuple3 = iterator.next();
matchJoinCount += tuple3.f1;
}
return matchJoinCount;
} else {
//other handlers don't care the possible join count.
return 0L;
}
}
private void collectResult(BaseRow row) {
collector.collect(row);
}
}