blob: 61c3699c650b107de278a62858a7ee24e390747f [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.table.runtime.join.batch;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.codegen.GeneratedSorter;
import org.apache.flink.table.codegen.JoinConditionFunction;
import org.apache.flink.table.codegen.Projection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.dataformat.util.BinaryRowUtil;
import org.apache.flink.table.plan.FlinkJoinRelType;
import org.apache.flink.table.runtime.AbstractStreamOperatorWithMetrics;
import org.apache.flink.table.runtime.sort.RecordComparator;
import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.table.typeutils.AbstractRowSerializer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.codehaus.commons.compiler.CompileException;
import java.util.BitSet;
import java.util.List;
import static org.apache.flink.table.codegen.CodeGenUtils.compile;
* An implementation that realizes the joining through a merge join strategy.
* This requires the two input to be sorted on the same key in advance.
* <p>This operator maintains one buffer for each input.
* The `current` base row in each buffer serves as a pointer to the currently compared row.</p>
* <p>This operator contains two stage, the "find" stage and the "merge" stage.</p>
* <p>1. The "find" stage</p>
* <p>The goal of this stage is to find two rows with the same join key in each buffer.</p>
* <p>When receiving a row from the input, we store the row into the corresponding buffer,
* and compare the join key of the `current` rows in each buffer.
* The buffer with the smaller join key should advance its `current` row.
* If the join keys of the two `current` rows are the same, we will move into the "merge" stage.</p>
* <p>2. The "merge" stage</p>
* <p>The goal of this stage is to join the rows with the same join key discovered in the "find" stage.</p>
* <p>We will select a buffer as the build buffer, and the other buffer as the probe buffer.
* We will then wait for all rows with the same join key to arrive at the build buffer.
* (If a row arrives at the probe buffer, we will of course store it,
* but no further operations will be done until all build rows are received).
* After that, we will iterate through the probe buffer and check if the rows can be joined.</p>
public class MergeJoinOperator extends AbstractStreamOperatorWithMetrics<BaseRow>
implements TwoInputStreamOperator<BaseRow, BaseRow, BaseRow> {
private final long leftBufferMemory;
private final long rightBufferMemory;
private final FlinkJoinRelType type;
private final GeneratedJoinConditionFunction condFuncCode;
private final GeneratedProjection projectionCode1;
private final GeneratedProjection projectionCode2;
private final GeneratedSorter keyGSorter;
private transient JoinConditionFunction condFunc;
private transient RecordComparator keyComparator;
private transient Collector<BaseRow> collector;
private transient boolean isFinished1;
private transient boolean isFinished2;
private transient AbstractRowSerializer<BaseRow> serializer1;
private transient AbstractRowSerializer<BaseRow> serializer2;
private transient WrappedBuffer buffer1;
private transient WrappedBuffer buffer2;
private transient BaseRow leftNullRow;
private transient BaseRow rightNullRow;
private transient JoinedRow joinedRow;
private transient boolean isFindStage;
private transient boolean advance1;
private transient boolean advance2;
private transient BinaryRow mergeKey;
private transient int mergeCount1;
private transient int mergeCount2;
private transient BitSet mergeBs1;
private transient BitSet mergeBs2;
private transient boolean leftIsBuild;
private transient MemoryManager memManager;
private transient IOManager ioManager;
private final int[] nullFilterKeys;
private final boolean nullSafe;
private final boolean filterAllNulls;
public MergeJoinOperator(
long leftBufferMemory,
long rightBufferMemory,
FlinkJoinRelType type,
GeneratedJoinConditionFunction condFuncCode,
GeneratedProjection projectionCode1,
GeneratedProjection projectionCode2,
GeneratedSorter keyGSorter,
boolean[] filterNulls) {
if (type != FlinkJoinRelType.INNER
&& type != FlinkJoinRelType.LEFT
&& type != FlinkJoinRelType.RIGHT
&& type != FlinkJoinRelType.FULL) {
throw new RuntimeException(
"Merge join operator only supports inner/left outer/right outer/full outer join currently.");
"Initializing merge join operator...\n" +
"leftBufferMemory = " + leftBufferMemory + ", " +
"rightBufferMemory = " + rightBufferMemory);
this.leftBufferMemory = leftBufferMemory;
this.rightBufferMemory = rightBufferMemory;
this.type = type;
this.condFuncCode = condFuncCode;
this.projectionCode1 = projectionCode1;
this.projectionCode2 = projectionCode2;
this.keyGSorter = keyGSorter;
this.nullFilterKeys = NullAwareJoinHelper.getNullFilterKeys(filterNulls);
this.nullSafe = nullFilterKeys.length == 0;
this.filterAllNulls = nullFilterKeys.length == filterNulls.length;
public void open() throws Exception {;
isFinished1 = false;
isFinished2 = false;
collector = new StreamRecordCollector<>(output);
serializer1 = (AbstractRowSerializer) getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader());
serializer2 = (AbstractRowSerializer) getOperatorConfig().getTypeSerializerIn2(getUserCodeClassloader());
leftNullRow = new GenericRow(serializer1.getNumFields());
rightNullRow = new GenericRow(serializer2.getNumFields());
joinedRow = new JoinedRow();
CookedClasses classes = cookGeneratedClasses(getContainingTask().getUserCodeClassLoader());
condFunc = classes.condFuncClass.newInstance();
keyComparator = classes.keyComparatorClass.newInstance();
keyComparator.init(keyGSorter.serializers(), keyGSorter.comparators());
memManager = getContainingTask().getEnvironment().getMemoryManager();
ioManager = getContainingTask().getEnvironment().getIOManager();
Projection<BaseRow, BinaryRow> projection1 = classes.projectionClass1.newInstance();
Projection<BaseRow, BinaryRow> projection2 = classes.projectionClass2.newInstance();
int pageNum1 = (int) (leftBufferMemory / memManager.getPageSize());
List<MemorySegment> mem1 = memManager.allocatePages(getContainingTask(), pageNum1);
buffer1 = new WrappedBuffer(mem1, serializer1, projection1);
int pageNum2 = (int) (rightBufferMemory / memManager.getPageSize());
List<MemorySegment> mem2 = memManager.allocatePages(getContainingTask(), pageNum2);
buffer2 = new WrappedBuffer(mem2, serializer2, projection2);
protected CookedClasses cookGeneratedClasses(ClassLoader cl) throws CompileException {
return new CookedClasses(
compile(cl,, condFuncCode.code()),
compile(cl, keyGSorter.comparator().name(), keyGSorter.comparator().code()),
compile(cl,, projectionCode1.code()),
compile(cl,, projectionCode2.code())
private void initJoin() {
advance1 = true;
advance2 = true;
isFindStage = true;
if (type.isLeftOuter()) {
mergeBs1 = new BitSet();
if (type.isRightOuter()) {
mergeBs2 = new BitSet();
private void initGauge() {
(Gauge<Long>) () -> buffer1.getUsedMemoryInBytes() + buffer2.getUsedMemoryInBytes());
(Gauge<Integer>) () -> buffer1.getNumSpillFiles() + buffer2.getNumSpillFiles());
(Gauge<Long>) () -> buffer1.getSpillInBytes() + buffer2.getSpillInBytes());
public TwoInputSelection firstInputSelection() {
return TwoInputSelection.ANY;
public TwoInputSelection processElement1(StreamRecord<BaseRow> record) throws Exception {
// must materialize cache immediately after the operation,
// because the record will be reused and the data it contains will be changed
return TwoInputSelection.ANY;
public TwoInputSelection processElement2(StreamRecord<BaseRow> record) throws Exception {
// must materialize cache immediately after the operation,
// because the record will be reused and the data it contains will be changed
return TwoInputSelection.ANY;
public void endInput1() throws Exception {
isFinished1 = true;
if (isAllFinished()) {
public void endInput2() throws Exception {
isFinished2 = true;
if (isAllFinished()) {
public void close() throws Exception {
if (buffer1 != null) {
if (buffer2 != null) {
private boolean isAllFinished() {
return isFinished1 && isFinished2;
private void runJoin() throws Exception {
boolean stepResult = true;
while (stepResult) {
if (isFindStage) {
stepResult = runFindStep();
} else {
stepResult = runMergeStep();
* Do one step of the "find" stage.
* <p>It will advance the current row of the buffer with the smaller join key,
* and then compare the join keys of the current rows in two buffers.</p>
private boolean runFindStep() {
if ((advance1 && !buffer1.hasNext())
|| (advance2 && !buffer2.hasNext())) {
return false;
BaseRow row1 = advance1 ? buffer1.nextRow() : buffer1.current;
BaseRow row2 = advance2 ? buffer2.nextRow() : buffer2.current;
boolean end1 = isEndRow(row1);
boolean end2 = isEndRow(row2);
int cmp;
if (end1 && end2) {
return false;
} else if (end1) {
cmp = 1;
} else if (end2) {
cmp = -1;
} else if (buffer1.currentKeyShouldFilter) {
cmp = -1;
} else if (buffer2.currentKeyShouldFilter) {
cmp = 1;
} else {
BinaryRow key1 = buffer1.currentKey;
BinaryRow key2 = buffer2.currentKey;
cmp =, key2);
if (cmp < 0) {
// `row1` is smaller, so buffer1 should advance
advance1 = true;
advance2 = false;
if (type.isLeftOuter()) {
collect(row1, rightNullRow);
} else if (cmp > 0) {
// `row2` is smaller, so buffer2 should advance
advance1 = false;
advance2 = true;
if (type.isRightOuter()) {
collect(leftNullRow, row2);
} else {
// same join key found
// find stage end, go to merge stage
return true;
private void initMergeStage() {
isFindStage = false;
// selects the build buffer
// only after all bulid rows in the build buffer arrive,
// can we iterate through the probe buffer and check for joins.
if (leftIsBuild = buffer1.isBuild()) {
advance1 = true;
advance2 = false;
} else {
advance1 = false;
advance2 = true;
mergeKey = buffer1.currentKey.copy();
mergeCount1 = 0;
mergeCount2 = 0;
if (mergeBs1 != null) {
if (mergeBs2 != null) {
* Do one step of the "merge" stage.
* <p>We will first try to collect the build rows in the build buffer,
* until a row with different join key is received.
* We will then iterate through the probe buffer and check for joins.</p>
private boolean runMergeStep() throws Exception {
if (advance1 && buffer1.hasNext()) {
if (leftIsBuild) {
advance1 = checkNextRowIsSameKey(buffer1);
if (!advance1 && leftIsBuild) {
// A different join key is read from the build buffer.
// This means we've received all build rows, so the probe side can go on.
// At this time, all build rows are in `externalBuffer` of the build buffer,
// and the `cache` in build buffer contains the first row with different join key.
advance2 = true;
} else {
advance1 = joinCurrentProbeRow(buffer1, buffer2, mergeCount2);
return true;
} else if (advance2 && buffer2.hasNext()) {
if (leftIsBuild) {
advance2 = joinCurrentProbeRow(buffer2, buffer1, mergeCount1);
} else {
advance2 = checkNextRowIsSameKey(buffer2);
if (!advance2 && !leftIsBuild) {
// A different join key is read from the build buffer.
// This means we've received all build rows, so the probe side can go on.
// At this time, all build rows are in `externalBuffer` of the build buffer,
// and the `cache` in build buffer contains the first row with different join key.
advance1 = true;
return true;
} else if (!advance1 && !advance2) {
// merge stage end, go to find stage
return true;
} else {
return false;
private boolean checkNextRowIsSameKey(WrappedBuffer buffer) {
BaseRow row = buffer.nextRow();
if (isEndRow(row)) {
return false;
} else {
BinaryRow key = buffer.currentKey;
// Here we suppose that
// `, key) == 0` is the same as `mergeKey.equals(key)`.
// `mergeKey.equals(key)` is faster, because `mergeKey` is a binary row,
// and the equality is compared by binary blocks, not by columns.
// If they are not the same in the future, please modify this.
return !buffer.currentKeyShouldFilter && mergeKey.equals(key);
* Check for joins using the current probe row and all build rows.
* After checking, we will advance the current probe row,
* and if the join key of the next row is not the same, the "merge" stage will end.
private boolean joinCurrentProbeRow(
WrappedBuffer probeBuffer, WrappedBuffer buildBuffer, int buildCount) throws Exception {
BaseRow probeRow = probeBuffer.current;
boolean matched = false;
ResettableExternalBuffer.BufferIterator buildIter = buildBuffer.externalIterator;
for (int index = 0; index < buildCount; index++) {
boolean result = buildIter.advanceNext();
Preconditions.checkState(result, "There is no next row in build buffer. This is a bug.");
BinaryRow buildRow = buildIter.getRow();
if (leftIsBuild) {
if (condFunc.apply(buildRow, probeRow)) {
matched = true;
collect(buildRow, probeRow);
if (mergeBs1 != null) {
} else {
if (condFunc.apply(probeRow, buildRow)) {
matched = true;
collect(probeRow, buildRow);
if (mergeBs2 != null) {
// After the iteration above,
// the current row of `buildBuffer.externalIterator` is the last build row.
// for the probe row of outer joins
if (!matched) {
if (mergeBs1 != null && !leftIsBuild) {
collect(probeRow, rightNullRow);
} else if (mergeBs2 != null && leftIsBuild) {
collect(leftNullRow, probeRow);
return checkNextRowIsSameKey(probeBuffer);
private void endMergeStage() throws IOException {
if (leftIsBuild && buffer1.externalIterator.rowInSpill(buffer1.externalIterator.getBeginRow())) {
LOG.warn("(In merge join operator) Build side iterator is in spilled file, " +
"this may decrease performance.");
} else if (!leftIsBuild && buffer2.externalIterator.rowInSpill(buffer2.externalIterator.getBeginRow())) {
LOG.warn("(In merge join operator) Build side iterator is in spilled file, " +
"this may decrease performance.");
// for the build rows of outer joins
if (leftIsBuild) {
if (mergeBs1 != null) {
for (int i = 0; i < mergeCount1; i++) {
if (!mergeBs1.get(i)) {
collect(buffer1.externalIterator.getRow(), rightNullRow);
} else {
if (mergeBs2 != null) {
for (int i = 0; i < mergeCount2; i++) {
if (!mergeBs2.get(i)) {
collect(leftNullRow, buffer2.externalIterator.getRow());
isFindStage = true;
advance1 = false;
advance2 = false;
if (leftIsBuild) {
} else {
private void collect(BaseRow row1, BaseRow row2) {
collector.collect(joinedRow.replace(row1, row2));
private static boolean isEndRow(BaseRow row) {
return row.getArity() == 0;
private boolean shouldFilterNull(BinaryRow key) {
return NullAwareJoinHelper.shouldFilter(nullSafe, filterAllNulls, nullFilterKeys, key);
* Generated classes.
protected static class CookedClasses {
protected CookedClasses(
Class<JoinConditionFunction> condFuncClass,
Class<RecordComparator> keyComparatorClass,
Class<Projection> projectionClass1,
Class<Projection> projectionClass2) {
this.condFuncClass = condFuncClass;
this.keyComparatorClass = keyComparatorClass;
this.projectionClass1 = projectionClass1;
this.projectionClass2 = projectionClass2;
protected final Class<JoinConditionFunction> condFuncClass;
protected final Class<RecordComparator> keyComparatorClass;
protected final Class<Projection> projectionClass1;
protected final Class<Projection> projectionClass2;
* A helper buffer class for merge join operator.
* <p>This "buffer" contains a {@link BaseRow} and a real {@link ResettableExternalBuffer}.</p>
* <p>One can regard this buffer as a queue,
* where the head of the resettable external buffer is the head of the queue,
* and the base row is the tail of the queue.</p>
* <p>The base row is only a reference to the reusable inputs of the operator.
* If the content of the base row should be reserved for later usage,
* it must be materialized to the resettable external buffer.</p>
* <p>This buffer is designed to reduce copying cost, and is sort of hack...</p>
private class WrappedBuffer {
// This `current` base row points to the head of the buffer.
private BaseRow current = null;
// `currentKey` contains the join key of `current`.
private BinaryRow currentKey = null;
private Projection<BaseRow, BinaryRow> projection;
private boolean currentKeyShouldFilter = true;
// This `cache` base row is the tail of the queue.
// It is only a reference to the reusable inputs of the operator.
private BaseRow cache = null;
// This `externalBuffer` contains the contents of the materialized `cache`s.
// It's head is the head of the buffer.
private ResettableExternalBuffer externalBuffer;
// This `externalIterator` is used to read the contents of `externalBuffer`.
// Only after all the records in `externalBuffer` are consumed
// can one read the content of `cache` using the `nextRow()` method.
private ResettableExternalBuffer.BufferIterator externalIterator;
private WrappedBuffer(
List<MemorySegment> memory,
AbstractRowSerializer serializer,
Projection<BaseRow, BinaryRow> projection) {
this.projection = projection;
this.externalBuffer = new ResettableExternalBuffer(memManager, ioManager, memory, serializer);
this.externalIterator = this.externalBuffer.newIterator();
* Adds a record to the end of the queue (that is to say, the `cache` base row).
* <p>Note that before adding, the `cache` base row must be empty.
* Otherwise the record it contains will be lost.</p>
private void add(BaseRow row) {
cache == null, "Old cache must be materialized to buffer. This is a bug.");
cache = row;
* Materialize the content of `cache` into `externalBuffer`
* (so that the content is preserved after `cache` is reused by the input of the operator).
private void materializeCache() throws IOException {
if (cache != null) {
if (isCacheJustRead()) {
// This condition means that cache is just read.
// As cache is read, we should not return its content again when we call `getNext()` method.
// So the newly materialized `cache` should be skipped.
// The content of `externalIterator.getRow()` is the same with `cache` currently.
// But we have to do this, becuase the content of `cache` will change after being reused.
current = externalIterator.getRow();
cache = null;
* Try to discard all records in the buffer.
* <p>This method is called by the operator
* if all the contents in this buffer will no longer be used.</p>
private void discard() {
// As we cannot discard some part of the `externalBuffer` (we can only discard all or none),
// we require all records in `externalBuffer` to be consumed
// (that is to say, we've just read the `cache`) before discarding.
if (isCacheJustRead()) {
private boolean isCacheJustRead() {
return current == cache;
private boolean hasNext() {
return (cache != null && current != cache) || externalIterator.hasNext();
* Get the next unread row of the buffer.
* <p>It will first try to consume the records in `externalBuffer`.
* If all records in `externalBuffer` are consumed, it will return the `cache`.</p>
* <p>The content of `currentKey` will also be calculated.</p>
private BaseRow nextRow() {
if (externalIterator.hasNext()) {
current = externalIterator.getRow();
} else {
current = cache;
if (current == null || isEndRow(current)) {
currentKey = null;
currentKeyShouldFilter = true;
} else {
currentKey = projection.apply(current);
currentKeyShouldFilter = shouldFilterNull(currentKey);
return current;
* Reset this buffer to an empty buffer.
private void reset() {
current = null;
currentKey = null;
cache = null;
private void close() {
* Clear all contents in `externalBuffer`, but do nothing to `cache` and `current`.
private void clearBuffer() {
if (externalBuffer.size() > 0) {
externalIterator = externalBuffer.newIterator();
private boolean isBuild() {
// Due to the property of merge join, one of the two buffers has just read its `cache`.
// So we select this buffer to be the build buffer.
// Why don't we use `!hasNext()` instead of `current == cache`?
// Consider this situation:
// 1. Record ('A', 1) is received from input 1 ('A' is the key).
// As nothing can be done currently, ('A', 1) is materialized to `externalBuffer`.
// 2. Record ('A', 2) is received from input 2, and is stored in its `cache`.
// Both ('A', 1) and ('A', 2) are read, and a merge is going to take place.
// 3. The `hasNext()` method of both buffers for input 1 and input 2 will return false,
// Because their records are read in step 2. If we select buffer 1 as the build buffer,
// the content in its `externalBuffer` will be wrongly cleared.
// Here we should select buffer 2 as the build buffer,
// because the first record to be merged is in its `cache`.
return current == cache;
private void enterBuildMode() {
// All records except the one in `cache` is of no use now.
// So we clear the content of the `externalBuffer` to store all the build rows.
// `externalIterator` can be used to iterate through all build rows in the merge stage.
private void leaveBuildMode() {
if (externalIterator.hasNext()) {
// The first "non-build" row is not in `cache`,
// so we can't clear the external buffer
// and should move one step forward to the first "non-build" row.
} else {
// The first "non-build" row is in `cache`, so the records in external buffer are of no use.
// ==================== metrics ====================
private long getUsedMemoryInBytes() {
return externalBuffer.getUsedMemoryInBytes();
private int getNumSpillFiles() {
return externalBuffer.getNumSpillFiles();
private long getSpillInBytes() {
return externalBuffer.getSpillInBytes();