blob: 71479abd890ab5481a2792d5b5a75a0cec68c9e9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.join.batch;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.codegen.GeneratedJoinConditionFunction;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.codegen.GeneratedSorter;
import org.apache.flink.table.codegen.JoinConditionFunction;
import org.apache.flink.table.codegen.Projection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.plan.FlinkJoinRelType;
import org.apache.flink.table.runtime.AbstractStreamOperatorWithMetrics;
import org.apache.flink.table.runtime.sort.BinaryExternalSorter;
import org.apache.flink.table.runtime.sort.NormalizedKeyComputer;
import org.apache.flink.table.runtime.sort.RecordComparator;
import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.table.typeutils.AbstractRowSerializer;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.codehaus.commons.compiler.CompileException;
import java.io.IOException;
import java.util.List;
import static org.apache.flink.table.codegen.CodeGenUtils.compile;
/**
* An implementation that realizes the joining through a sort-merge join strategy.
* This operator only sorts one input, so another input should be sorted on the same key in advance.
*/
public class OneSideSortMergeJoinOperator extends AbstractStreamOperatorWithMetrics<BaseRow>
implements TwoInputStreamOperator<BaseRow, BaseRow, BaseRow> {
private final long reservedSortMemory;
private final long maxSortMemory;
private final long perRequestMemory;
private final long probeBufferMemory;
private final long joinBufferMemory;
private final FlinkJoinRelType type;
private final boolean leftNeedsSort;
private final boolean[] filterNulls;
private GeneratedJoinConditionFunction condFuncCode;
private final GeneratedProjection projectionCode1;
private final GeneratedProjection projectionCode2;
private final GeneratedSorter gSorter;
private final GeneratedSorter keyGSorter;
private transient CookedClasses classes;
private transient MemoryManager memManager;
private transient IOManager ioManager;
private transient AbstractRowSerializer inputSerializer1;
private transient AbstractRowSerializer inputSerializer2;
private transient BinaryRowSerializer serializer1;
private transient BinaryRowSerializer serializer2;
private transient BinaryExternalSorter sorter;
private transient BaseRow probeRow;
private transient ResettableExternalBuffer probeBuffer;
private transient ProbeIterator probeIter;
private transient SortMergeJoinIterator joinIterator;
private transient BaseRow leftNullRow;
private transient BaseRow rightNullRow;
private transient boolean isFinished1;
private transient boolean isFinished2;
private transient SortMergeJoinHelper helper;
public OneSideSortMergeJoinOperator(
long reservedSortMemory,
long maxSortMemory,
long perRequestMemory,
long probeBufferMemory,
long joinBufferMemory,
FlinkJoinRelType type,
boolean leftNeedsSort,
GeneratedJoinConditionFunction condFuncCode,
GeneratedProjection projectionCode1,
GeneratedProjection projectionCode2,
GeneratedSorter gSorter,
GeneratedSorter keyGSorter,
boolean[] filterNulls) {
if (type != FlinkJoinRelType.INNER
&& type != FlinkJoinRelType.LEFT
&& type != FlinkJoinRelType.RIGHT) {
throw new RuntimeException("One side sort merge join operator only supports " +
"inner/left outer/right outer join currently.");
}
LOG.info(
"Initializing one side sort merge join operator...\n" +
"leftNeedsSort = " + leftNeedsSort + ", " +
"reservedSortMemory = " + reservedSortMemory + ", " +
"preferredSortMemory = " + maxSortMemory + ", " +
"perRequestMemory = " + perRequestMemory + ", " +
"probeBufferMemory = " + probeBufferMemory + ", " +
"joinBufferMemory = " + joinBufferMemory);
this.reservedSortMemory = reservedSortMemory;
this.maxSortMemory = maxSortMemory;
this.perRequestMemory = perRequestMemory;
this.probeBufferMemory = probeBufferMemory;
this.joinBufferMemory = joinBufferMemory;
this.type = type;
this.leftNeedsSort = leftNeedsSort;
this.filterNulls = filterNulls;
this.condFuncCode = condFuncCode;
this.projectionCode1 = projectionCode1;
this.projectionCode2 = projectionCode2;
this.gSorter = gSorter;
this.keyGSorter = keyGSorter;
}
@Override
public void open() throws Exception {
super.open();
isFinished1 = false;
isFinished2 = false;
classes = cookGeneratedClasses(getContainingTask().getUserCodeClassLoader());
Collector<BaseRow> collector = new StreamRecordCollector<>(output);
inputSerializer1 = (AbstractRowSerializer) getOperatorConfig()
.getTypeSerializerIn1(getUserCodeClassloader());
serializer1 = new BinaryRowSerializer(inputSerializer1.getTypes());
inputSerializer2 = (AbstractRowSerializer) getOperatorConfig()
.getTypeSerializerIn2(getUserCodeClassloader());
serializer2 = new BinaryRowSerializer(inputSerializer2.getTypes());
memManager = this.getContainingTask().getEnvironment().getMemoryManager();
ioManager = this.getContainingTask().getEnvironment().getIOManager();
JoinConditionFunction condFunc = classes.condFuncClass.newInstance();
leftNullRow = new GenericRow(serializer1.getNumFields());
rightNullRow = new GenericRow(serializer2.getNumFields());
JoinedRow joinedRow = new JoinedRow();
helper =
new SortMergeJoinHelper(collector, condFunc, leftNullRow, rightNullRow, joinedRow);
initSorter();
probeBuffer =
newBuffer(probeBufferMemory, leftNeedsSort ? inputSerializer2 : inputSerializer1);
initGauge();
}
protected CookedClasses cookGeneratedClasses(ClassLoader cl) throws CompileException {
return new CookedClasses(
compile(cl, condFuncCode.name(), condFuncCode.code()),
compile(cl, keyGSorter.comparator().name(), keyGSorter.comparator().code()),
compile(cl, projectionCode1.name(), projectionCode1.code()),
compile(cl, projectionCode2.name(), projectionCode2.code()),
compile(cl, gSorter.computer().name(), gSorter.computer().code()),
compile(cl, gSorter.comparator().name(), gSorter.comparator().code())
);
}
private void initGauge() {
getMetricGroup().gauge(
"memoryUsedSizeInBytes",
(Gauge<Long>) () -> sorter.getUsedMemoryInBytes() + probeBuffer.getUsedMemoryInBytes());
getMetricGroup().gauge(
"numSpillFiles",
(Gauge<Long>) () -> sorter.getNumSpillFiles() + probeBuffer.getNumSpillFiles());
getMetricGroup().gauge(
"spillInBytes",
(Gauge<Long>) () -> sorter.getSpillInBytes() + probeBuffer.getSpillInBytes());
}
private void initSorter() throws Exception {
NormalizedKeyComputer computer = classes.computerClass.newInstance();
computer.init(gSorter.serializers(), gSorter.comparators());
RecordComparator comparator = classes.comparatorClass.newInstance();
comparator.init(gSorter.serializers(), gSorter.comparators());
AbstractRowSerializer inputSerializer;
BinaryRowSerializer serializer;
if (leftNeedsSort) {
inputSerializer = inputSerializer1;
serializer = serializer1;
} else {
inputSerializer = inputSerializer2;
serializer = serializer2;
}
sorter = new BinaryExternalSorter(this.getContainingTask(),
memManager, reservedSortMemory, maxSortMemory, perRequestMemory,
ioManager, inputSerializer, serializer, computer, comparator, getSqlConf());
this.sorter.startThreads();
}
private void initJoinIterator() throws Exception {
Projection projection1 = classes.projectionClass1.newInstance();
Projection projection2 = classes.projectionClass2.newInstance();
probeIter = new ProbeIterator();
MutableObjectIterator<BinaryRow> sortIter = sorter.getIterator();
RecordComparator keyComparator = classes.keyComparatorClass.newInstance();
keyComparator.init(keyGSorter.serializers(), keyGSorter.comparators());
if (type == FlinkJoinRelType.INNER) {
if (leftNeedsSort) {
joinIterator = new SortMergeInnerJoinIterator(
serializer2, serializer1, projection2, projection1, keyComparator,
probeIter, sortIter, newBuffer(joinBufferMemory, serializer1), filterNulls);
} else {
joinIterator = new SortMergeInnerJoinIterator(
serializer1, serializer2, projection1, projection2, keyComparator,
probeIter, sortIter, newBuffer(joinBufferMemory, serializer2), filterNulls);
}
} else if (type == FlinkJoinRelType.LEFT) {
joinIterator = new SortMergeOneSideOuterJoinIterator(
serializer1, serializer2, projection1, projection2, keyComparator,
probeIter, sortIter, newBuffer(joinBufferMemory, serializer2), filterNulls);
} else if (type == FlinkJoinRelType.RIGHT) {
joinIterator = new SortMergeOneSideOuterJoinIterator(
serializer2, serializer1, projection2, projection1, keyComparator,
probeIter, sortIter, newBuffer(joinBufferMemory, serializer1), filterNulls);
} else {
throw new RuntimeException("Not support type: " + type);
}
}
@Override
public TwoInputSelection firstInputSelection() {
return TwoInputSelection.ANY;
}
@Override
public TwoInputSelection processElement1(StreamRecord<BaseRow> record) throws Exception {
if (leftNeedsSort) {
sorter.write(record.getValue());
} else {
BaseRow row = record.getValue();
if (isFinished2) {
probeRow = row;
runJoin();
} else {
probeBuffer.add(row);
}
}
return TwoInputSelection.ANY;
}
@Override
public TwoInputSelection processElement2(StreamRecord<BaseRow> record) throws Exception {
if (leftNeedsSort) {
BaseRow row = record.getValue();
if (isFinished1) {
probeRow = row;
runJoin();
} else {
probeBuffer.add(row);
}
} else {
sorter.write(record.getValue());
}
return TwoInputSelection.ANY;
}
@Override
public void endInput1() throws Exception {
isFinished1 = true;
if (leftNeedsSort) {
initJoinIterator();
runJoin();
}
}
@Override
public void endInput2() throws Exception {
isFinished2 = true;
if (!leftNeedsSort) {
initJoinIterator();
runJoin();
}
}
@Override
public void close() throws Exception {
super.close();
if (probeIter != null) {
probeIter.close();
}
if (probeBuffer != null) {
probeBuffer.close();
}
if (sorter != null) {
sorter.close();
}
if (joinIterator != null) {
joinIterator.close();
}
}
private void runJoin() throws Exception {
if (type == FlinkJoinRelType.INNER) {
helper.innerJoin((SortMergeInnerJoinIterator) joinIterator, leftNeedsSort);
} else if (type == FlinkJoinRelType.LEFT) {
if (leftNeedsSort) {
throw new RuntimeException("Not support yet!");
} else {
helper.oneSideOuterJoin(
(SortMergeOneSideOuterJoinIterator) joinIterator, false, rightNullRow);
}
} else if (type == FlinkJoinRelType.RIGHT) {
if (leftNeedsSort) {
helper.oneSideOuterJoin(
(SortMergeOneSideOuterJoinIterator) joinIterator, true, leftNullRow);
} else {
throw new RuntimeException("Not support yet!");
}
}
}
private ResettableExternalBuffer newBuffer(
long memorySize, AbstractRowSerializer serializer) throws MemoryAllocationException {
List<MemorySegment> externalBufferSegments = memManager.allocatePages(
this.getContainingTask(), (int) (memorySize / memManager.getPageSize()));
return new ResettableExternalBuffer(memManager, ioManager, externalBufferSegments, serializer);
}
/**
* Generated classes.
*/
protected static class CookedClasses {
protected CookedClasses(
Class<JoinConditionFunction> condFuncClass,
Class<RecordComparator> keyComparatorClass,
Class<Projection> projectionClass1,
Class<Projection> projectionClass2,
Class<NormalizedKeyComputer> computerClass,
Class<RecordComparator> comparatorClass) {
this.condFuncClass = condFuncClass;
this.keyComparatorClass = keyComparatorClass;
this.projectionClass1 = projectionClass1;
this.projectionClass2 = projectionClass2;
this.computerClass = computerClass;
this.comparatorClass = comparatorClass;
}
protected final Class<JoinConditionFunction> condFuncClass;
protected final Class<RecordComparator> keyComparatorClass;
protected final Class<Projection> projectionClass1;
protected final Class<Projection> projectionClass2;
protected final Class<NormalizedKeyComputer> computerClass;
protected final Class<RecordComparator> comparatorClass;
}
private class ProbeIterator implements MutableObjectIterator<BaseRow> {
private ResettableExternalBuffer.BufferIterator bufferIterator;
private ProbeIterator() {
bufferIterator = probeBuffer.newIterator();
}
@Override
public BaseRow next(BaseRow reuse) throws IOException {
return next();
}
@Override
public BaseRow next() throws IOException {
if (bufferIterator.hasNext()) {
bufferIterator.advanceNext();
return bufferIterator.getRow();
} else {
if (probeRow == null) {
return null;
}
BaseRow row = probeRow;
probeRow = null;
return row;
}
}
public void close() {
bufferIterator.close();
}
}
}