blob: 41dfff4d49aaa5e67be4cb8cc386d76cccc5704c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.join.batch.hashtable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.util.BitSet;
import org.apache.flink.table.codegen.JoinConditionFunction;
import org.apache.flink.table.codegen.Projection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.dataformat.util.BinaryRowUtil;
import org.apache.flink.table.runtime.join.batch.HashJoinType;
import org.apache.flink.table.runtime.join.batch.NullAwareJoinHelper;
import org.apache.flink.table.runtime.util.ChannelWithMeta;
import org.apache.flink.table.runtime.util.FileChannelUtil;
import org.apache.flink.table.runtime.util.PagedChannelReaderInputViewIterator;
import org.apache.flink.table.runtime.util.RowIterator;
import org.apache.flink.table.runtime.util.WrappedRowIterator;
import org.apache.flink.table.typeutils.AbstractRowSerializer;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.apache.flink.util.MathUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* An implementation of a Hybrid Hash Join. The join starts operating in memory and gradually
* starts spilling contents to disk, when the memory is not sufficient. It does not need to know a
* priority how large the input will be.
*
* <p>The design of this class follows in many parts the design presented in
* "Hash joins and hash teams in Microsoft SQL Server", by Goetz Graefe et al. In its current state,
* the implementation lacks features like dynamic role reversal, partition tuning, or histogram
* guided partitioning.</p>
*/
public class BinaryHashTable extends BaseHybridHashTable {
/**
* The utilities to serialize the build side data types.
*/
final BinaryRowSerializer binaryBuildSideSerializer;
private final AbstractRowSerializer originBuildSideSerializer;
/**
* The utilities to serialize the probe side data types.
*/
private final BinaryRowSerializer binaryProbeSideSerializer;
private final AbstractRowSerializer originProbeSideSerializer;
/**
* The utilities to hash and compare the build side data types.
*/
private final Projection<BaseRow, BinaryRow> buildSideProjection;
/**
* The utilities to hash and compare the probe side data types.
*/
private final Projection<BaseRow, BinaryRow> probeSideProjection;
final int bucketsPerSegment;
/**
* The number of hash table buckets in a single memory segment - 1.
* Because memory segments can be comparatively large, we fit multiple buckets into one memory
* segment.
* This variable is a mask that is 1 in the lower bits that define the number of a bucket
* in a segment.
*/
final int bucketsPerSegmentMask;
/**
* The number of bits that describe the position of a bucket in a memory segment. Computed as
* log2(bucketsPerSegment).
*/
final int bucketsPerSegmentBits;
/** Flag to enable/disable bloom filters for spilled partitions. */
final boolean useBloomFilters;
/**
* The partitions that are built by processing the current partition.
*/
final ArrayList<BinaryHashPartition> partitionsBeingBuilt;
/**
* BitSet which used to mark whether the element(int build side) has successfully matched during
* probe phase. As there are 9 elements in each bucket, we assign 2 bytes to BitSet.
*/
final BitSet probedSet = new BitSet(2);
/**
* The partitions that have been spilled previously and are pending to be processed.
*/
private final ArrayList<BinaryHashPartition> partitionsPending;
private final JoinConditionFunction condFunc;
private final boolean reverseJoin;
/**
* Should filter null keys.
*/
private final int[] nullFilterKeys;
/**
* No keys need to filter null.
*/
private final boolean nullSafe;
/**
* Filter null to all keys.
*/
private final boolean filterAllNulls;
/**
* Iterator over the elements in the hash table.
*/
LookupBucketIterator bucketIterator;
/**
* Iterator over the elements from the probe side.
*/
private ProbeIterator probeIterator;
final HashJoinType type;
private RowIterator<BinaryRow> buildIterator;
private boolean probeMatchedPhase = true;
private boolean buildIterVisited = false;
private BinaryRow probeKey;
private BaseRow probeRow;
BinaryRow reuseBuildRow;
public BinaryHashTable(
Configuration conf,
Object owner,
AbstractRowSerializer buildSideSerializer,
AbstractRowSerializer probeSideSerializer,
Projection<BaseRow, BinaryRow> buildSideProjection,
Projection<BaseRow, BinaryRow> probeSideProjection,
MemoryManager memManager,
long reservedMemorySize,
IOManager ioManager, int avgRecordLen, int buildRowCount,
boolean useBloomFilters, HashJoinType type,
JoinConditionFunction condFunc, boolean reverseJoin, boolean[] filterNulls,
boolean tryDistinctBuildRow) {
this(conf, owner, buildSideSerializer, probeSideSerializer, buildSideProjection, probeSideProjection, memManager,
reservedMemorySize, reservedMemorySize, 0, ioManager, avgRecordLen, buildRowCount, useBloomFilters, type, condFunc,
reverseJoin, filterNulls, tryDistinctBuildRow);
}
public BinaryHashTable(
Configuration conf,
Object owner,
AbstractRowSerializer buildSideSerializer,
AbstractRowSerializer probeSideSerializer,
Projection<BaseRow, BinaryRow> buildSideProjection,
Projection<BaseRow, BinaryRow> probeSideProjection,
MemoryManager memManager,
long reservedMemorySize,
long preferredMemorySize,
long perRequestMemorySize,
IOManager ioManager, int avgRecordLen, long buildRowCount,
boolean useBloomFilters, HashJoinType type,
JoinConditionFunction condFunc, boolean reverseJoin, boolean[] filterNulls,
boolean tryDistinctBuildRow) {
super(conf, owner, memManager, reservedMemorySize, preferredMemorySize, perRequestMemorySize,
ioManager, avgRecordLen, buildRowCount, !type.buildLeftSemiOrAnti() && tryDistinctBuildRow);
// assign the members
this.originBuildSideSerializer = buildSideSerializer;
this.binaryBuildSideSerializer = new BinaryRowSerializer(
buildSideSerializer.getTypes());
this.reuseBuildRow = binaryBuildSideSerializer.createInstance();
this.originProbeSideSerializer = probeSideSerializer;
this.binaryProbeSideSerializer = new BinaryRowSerializer(
originProbeSideSerializer.getTypes());
this.buildSideProjection = buildSideProjection;
this.probeSideProjection = probeSideProjection;
this.useBloomFilters = useBloomFilters;
this.type = type;
this.condFunc = condFunc;
this.reverseJoin = reverseJoin;
this.nullFilterKeys = NullAwareJoinHelper.getNullFilterKeys(filterNulls);
this.nullSafe = nullFilterKeys.length == 0;
this.filterAllNulls = nullFilterKeys.length == filterNulls.length;
this.bucketsPerSegment = this.segmentSize >> BinaryHashBucketArea.BUCKET_SIZE_BITS;
checkArgument(bucketsPerSegment != 0,
"Hash Table requires buffers of at least " + BinaryHashBucketArea.BUCKET_SIZE + " bytes.");
this.bucketsPerSegmentMask = bucketsPerSegment - 1;
this.bucketsPerSegmentBits = MathUtils.log2strict(bucketsPerSegment);
this.partitionsBeingBuilt = new ArrayList<>();
this.partitionsPending = new ArrayList<>();
createPartitions(initPartitionFanOut, 0);
}
// ========================== build phase public method ======================================
/**
* Put a build side row to hash table.
*/
public void putBuildRow(BaseRow row) throws IOException {
final int hashCode = hash(this.buildSideProjection.apply(row).hashCode(), 0);
// TODO: combine key projection and build side conversion to code gen.
insertIntoTable(originBuildSideSerializer.baseRowToBinary(row), hashCode);
}
/**
* End build phase.
*/
public void endBuild() throws IOException {
// finalize the partitions
int buildWriteBuffers = 0;
for (BinaryHashPartition p : this.partitionsBeingBuilt) {
buildWriteBuffers += p.finalizeBuildPhase(this.ioManager, this.currentEnumerator);
}
buildSpillRetBufferNumbers += buildWriteBuffers;
// the first prober is the probe-side input, but the input is null at beginning
this.probeIterator = new ProbeIterator(this.binaryProbeSideSerializer.createInstance());
// the bucket iterator can remain constant over the time
this.bucketIterator = new LookupBucketIterator(this);
}
// ========================== probe phase public method ======================================
/**
* Find matched build side rows for a probe row.
* @return return false if the target partition has spilled, we will spill this probe row too.
* The row will be re-match in rebuild phase.
*/
public boolean tryProbe(BaseRow record) throws IOException {
if (!this.probeIterator.hasSource()) {
// set the current probe value when probeIterator is null at the begging.
this.probeIterator.setInstance(record);
}
// calculate the hash
BinaryRow probeKey = probeSideProjection.apply(record);
final int hash = hash(probeKey.hashCode(), this.currentRecursionDepth);
BinaryHashPartition p = this.partitionsBeingBuilt.get(hash % partitionsBeingBuilt.size());
// for an in-memory partition, process set the return iterators, else spill the probe records
if (p.isInMemory()) {
this.probeKey = probeKey;
this.probeRow = record;
p.bucketArea.startLookup(hash);
return true;
} else {
if (p.testHashBloomFilter(hash)) {
BinaryRow row = originProbeSideSerializer.baseRowToBinary(record);
p.insertIntoProbeBuffer(row);
}
return false;
}
}
// ========================== rebuild phase public method ======================================
/**
* Next record from rebuilt spilled partition or build side outer partition.
*/
public boolean nextMatching() throws IOException {
if (type.needSetProbed()) {
return processProbeIter() || processBuildIter() || prepareNextPartition();
} else {
return processProbeIter() || prepareNextPartition();
}
}
public BaseRow getCurrentProbeRow() {
if (this.probeMatchedPhase) {
return this.probeIterator.current();
} else {
return null;
}
}
public RowIterator<BinaryRow> getBuildSideIterator() {
return probeMatchedPhase ? bucketIterator : buildIterator;
}
// ================================ internal method ===========================================
/**
* Determines the number of buffers to be used for asynchronous write behind. It is currently
* computed as the logarithm of the number of buffers to the base 4, rounded up, minus 2.
* The upper limit for the number of write behind buffers is however set to six.
*
* @param numBuffers The number of available buffers.
* @return The number
*/
@VisibleForTesting
static int getNumWriteBehindBuffers(int numBuffers) {
int numIOBufs = (int) (Math.log(numBuffers) / Math.log(4) - 1.5);
return numIOBufs > 6 ? 6 : numIOBufs;
}
private boolean processProbeIter() throws IOException {
// the prober's source is null at the begging.
if (this.probeIterator.hasSource()) {
final ProbeIterator probeIter = this.probeIterator;
if (!this.probeMatchedPhase) {
return false;
}
BinaryRow next;
while ((next = probeIter.next()) != null) {
BinaryRow probeKey = probeSideProjection.apply(next);
final int hash = hash(probeKey.hashCode(), this.currentRecursionDepth);
final BinaryHashPartition p = this.partitionsBeingBuilt.get(hash % partitionsBeingBuilt.size());
// for an in-memory partition, process set the return iterators, else spill the probe records
if (p.isInMemory()) {
this.probeKey = probeKey;
this.probeRow = next;
p.bucketArea.startLookup(hash);
return true;
} else {
p.insertIntoProbeBuffer(next);
}
}
// -------------- partition done ---------------
return false;
} else {
return false;
}
}
private boolean processBuildIter() throws IOException {
if (this.buildIterVisited) {
return false;
}
this.probeMatchedPhase = false;
this.buildIterator = new BuildSideIterator(
this.binaryBuildSideSerializer, reuseBuildRow,
this.partitionsBeingBuilt, probedSet, type.equals(HashJoinType.BUILD_LEFT_SEMI));
this.buildIterVisited = true;
return true;
}
private boolean prepareNextPartition() throws IOException {
// finalize and cleanup the partitions of the current table
for (final BinaryHashPartition p : this.partitionsBeingBuilt) {
p.finalizeProbePhase(this.availableMemory, this.partitionsPending, type.needSetProbed());
}
this.partitionsBeingBuilt.clear();
if (this.currentSpilledBuildSide != null) {
this.currentSpilledBuildSide.closeAndDelete();
this.currentSpilledBuildSide = null;
}
if (this.currentSpilledProbeSide != null) {
this.currentSpilledProbeSide.closeAndDelete();
this.currentSpilledProbeSide = null;
}
if (this.partitionsPending.isEmpty()) {
// no more data
return false;
}
// there are pending partitions
final BinaryHashPartition p = this.partitionsPending.get(0);
LOG.info(String.format("Begin to process spilled partition [%d]", p.getPartitionNumber()));
if (p.probeSideRecordCounter == 0) {
// unprobed spilled partitions are only re-processed for a build-side outer join;
// there is no need to create a hash table since there are no probe-side records
this.currentSpilledBuildSide = createInputView(p.getBuildSideChannel().getChannelID(),
p.getBuildSideBlockCount(), p.getLastSegmentLimit());
this.buildIterator = new WrappedRowIterator<>(
new PagedChannelReaderInputViewIterator<>(currentSpilledBuildSide, this.binaryBuildSideSerializer),
binaryBuildSideSerializer.createInstance());
this.partitionsPending.remove(0);
return true;
}
this.probeMatchedPhase = true;
this.buildIterVisited = false;
// build the next table; memory must be allocated after this call
buildTableFromSpilledPartition(p);
// set the probe side
ChannelWithMeta channelWithMeta = new ChannelWithMeta(
p.probeSideBuffer.getChannelID(),
p.probeSideBuffer.getBlockCount(),
p.probeNumBytesInLastSeg);
this.currentSpilledProbeSide = FileChannelUtil.createInputView(ioManager, channelWithMeta, new ArrayList<>(),
compressionEnable, compressionCodecFactory, compressionBlockSize, segmentSize);
ChannelReaderInputViewIterator<BinaryRow> probeReader = new ChannelReaderInputViewIterator<>(
this.currentSpilledProbeSide, this.binaryProbeSideSerializer);
this.probeIterator.set(probeReader);
this.probeIterator.setReuse(binaryProbeSideSerializer.createInstance());
// unregister the pending partition
this.partitionsPending.remove(0);
this.currentRecursionDepth = p.getRecursionLevel() + 1;
// recursively get the next
return nextMatching();
}
private void buildTableFromSpilledPartition(
final BinaryHashPartition p) throws IOException {
final int nextRecursionLevel = p.getRecursionLevel() + 1;
if (nextRecursionLevel == 2) {
LOG.info("Recursive hash join: partition number is " + p.getPartitionNumber());
} else if (nextRecursionLevel > MAX_RECURSION_DEPTH) {
throw new RuntimeException("Hash join exceeded maximum number of recursions, without reducing "
+ "partitions enough to be memory resident. Probably cause: Too many duplicate keys.");
}
if (p.getBuildSideBlockCount() > p.getProbeSideBlockCount()) {
LOG.info(String.format(
"Hash join: Partition(%d) " +
"build side block [%d] more than probe side block [%d]",
p.getPartitionNumber(),
p.getBuildSideBlockCount(),
p.getProbeSideBlockCount()));
}
// we distinguish two cases here:
// 1) The partition fits entirely into main memory. That is the case if we have enough buffers for
// all partition segments, plus enough buffers to hold the table structure.
// --> We read the partition in as it is and create a hashtable that references only
// that single partition.
// 2) We can not guarantee that enough memory segments are available and read the partition
// in, distributing its data among newly created partitions.
final int totalBuffersAvailable = this.availableMemory.size() + this.buildSpillRetBufferNumbers;
if (totalBuffersAvailable != this.reservedNumBuffers + this.allocatedFloatingNum) {
throw new RuntimeException(String.format("Hash Join bug in memory management: Memory buffers leaked." +
" availableMemory(%s), buildSpillRetBufferNumbers(%s), reservedNumBuffers(%s), allocatedFloatingNum(%s)",
availableMemory.size(), buildSpillRetBufferNumbers, reservedNumBuffers, allocatedFloatingNum));
}
long numBuckets = p.getBuildSideRecordCount() / BinaryHashBucketArea.NUM_ENTRIES_PER_BUCKET + 1;
// we need to consider the worst case where everything hashes to one bucket which needs to overflow by the same
// number of total buckets again. Also, one buffer needs to remain for the probing
int maxBucketAreaBuffers = Math.max((int) (2 * (numBuckets / (this.bucketsPerSegmentMask + 1))), 1);
final long totalBuffersNeeded = maxBucketAreaBuffers + p.getBuildSideBlockCount() + 2;
if (totalBuffersNeeded < totalBuffersAvailable) {
LOG.info(String.format("Build in memory hash table from spilled partition [%d]", p.getPartitionNumber()));
// first read the partition in
final List<MemorySegment> partitionBuffers = readAllBuffers(p.getBuildSideChannel().getChannelID(), p.getBuildSideBlockCount());
BinaryHashBucketArea area = new BinaryHashBucketArea(this, (int) p.getBuildSideRecordCount(), maxBucketAreaBuffers);
final BinaryHashPartition newPart = new BinaryHashPartition(area, this.binaryBuildSideSerializer, this.binaryProbeSideSerializer,
0, nextRecursionLevel, partitionBuffers, p.getBuildSideRecordCount(), this.segmentSize, p.getLastSegmentLimit());
area.setPartition(newPart);
this.partitionsBeingBuilt.add(newPart);
// now, index the partition through a hash table
final BinaryHashPartition.PartitionIterator pIter = newPart.newPartitionIterator();
while (pIter.advanceNext()) {
final int hashCode = hash(buildSideProjection.apply(pIter.getRow()).hashCode(), nextRecursionLevel);
final int pointer = (int) pIter.getPointer();
area.insertToBucket(hashCode, pointer, false, true);
}
} else {
// go over the complete input and insert every element into the hash table
// compute in how many splits, we'd need to partition the result
final int splits = (int) (totalBuffersNeeded / totalBuffersAvailable) + 1;
final int partitionFanOut = Math.min(Math.min(10 * splits, MAX_NUM_PARTITIONS), maxNumPartition());
createPartitions(partitionFanOut, nextRecursionLevel);
LOG.info(String.format("Build hybrid hash table from spilled partition [%d] with recursion level [%d]",
p.getPartitionNumber(), nextRecursionLevel));
ChannelReaderInputView inView = createInputView(p.getBuildSideChannel().getChannelID(), p.getBuildSideBlockCount(), p.getLastSegmentLimit());
final PagedChannelReaderInputViewIterator<BinaryRow> inIter =
new PagedChannelReaderInputViewIterator<>(inView, this.binaryBuildSideSerializer);
BinaryRow rec = this.binaryBuildSideSerializer.createInstance();
while ((rec = inIter.next(rec)) != null) {
final int hashCode = hash(this.buildSideProjection.apply(rec).hashCode(), nextRecursionLevel);
insertIntoTable(rec, hashCode);
}
inView.closeAndDelete();
// finalize the partitions
int buildWriteBuffers = 0;
for (BinaryHashPartition part : this.partitionsBeingBuilt) {
buildWriteBuffers += part.finalizeBuildPhase(this.ioManager, this.currentEnumerator);
}
buildSpillRetBufferNumbers += buildWriteBuffers;
}
}
private void insertIntoTable(final BinaryRow record, final int hashCode) throws IOException {
BinaryHashPartition p = partitionsBeingBuilt.get(hashCode % partitionsBeingBuilt.size());
if (p.isInMemory()) {
if (!p.bucketArea.appendRecordAndInsert(record, hashCode)) {
p.addHashBloomFilter(hashCode);
}
} else {
p.insertIntoBuildBuffer(record);
p.addHashBloomFilter(hashCode);
}
}
private void createPartitions(int numPartitions, int recursionLevel) {
// sanity check
ensureNumBuffersReturned(numPartitions);
this.currentEnumerator = this.ioManager.createChannelEnumerator();
this.partitionsBeingBuilt.clear();
double numRecordPerPartition = (double) buildRowCount / numPartitions;
int maxBuffer = maxInitBufferOfBucketArea(numPartitions);
for (int i = 0; i < numPartitions; i++) {
BinaryHashBucketArea area = new BinaryHashBucketArea(this, numRecordPerPartition, maxBuffer);
BinaryHashPartition p = new BinaryHashPartition(area, this.binaryBuildSideSerializer,
this.binaryProbeSideSerializer, i, recursionLevel, getNotNullNextBuffer(), this, this.segmentSize,
compressionEnable, compressionCodecFactory, compressionBlockSize);
area.setPartition(p);
this.partitionsBeingBuilt.add(p);
}
}
/**
* This method clears all partitions currently residing (partially) in memory. It releases all
* memory
* and deletes all spilled partitions.
*
* <p>This method is intended for a hard cleanup in the case that the join is aborted.
*/
@Override
public void clearPartitions() {
// clear the iterators, so the next call to next() will notice
this.bucketIterator = null;
this.probeIterator = null;
for (int i = this.partitionsBeingBuilt.size() - 1; i >= 0; --i) {
final BinaryHashPartition p = this.partitionsBeingBuilt.get(i);
try {
p.clearAllMemory(this.availableMemory);
} catch (Exception e) {
LOG.error("Error during partition cleanup.", e);
}
}
this.partitionsBeingBuilt.clear();
// clear the partitions that are still to be done (that have files on disk)
for (final BinaryHashPartition p : this.partitionsPending) {
p.clearAllMemory(this.availableMemory);
}
}
/**
* Selects a partition and spills it. The number of the spilled partition is returned.
*
* @return The number of the spilled partition.
*/
@Override
protected int spillPartition() throws IOException {
// find the largest partition
int largestNumBlocks = 0;
int largestPartNum = -1;
for (int i = 0; i < partitionsBeingBuilt.size(); i++) {
BinaryHashPartition p = partitionsBeingBuilt.get(i);
if (p.isInMemory() && p.getNumOccupiedMemorySegments() > largestNumBlocks) {
largestNumBlocks = p.getNumOccupiedMemorySegments();
largestPartNum = i;
}
}
final BinaryHashPartition p = partitionsBeingBuilt.get(largestPartNum);
// spill the partition
int numBuffersFreed = p.spillPartition(this.ioManager,
this.currentEnumerator.next(), this.buildSpillReturnBuffers);
this.buildSpillRetBufferNumbers += numBuffersFreed;
LOG.info(String.format("Grace hash join: Ran out memory, choosing partition " +
"[%d] to spill, %d memory segments being freed",
largestPartNum, numBuffersFreed));
// grab as many buffers as are available directly
MemorySegment currBuff;
while (this.buildSpillRetBufferNumbers > 0 && (currBuff = this.buildSpillReturnBuffers.poll()) != null) {
this.availableMemory.add(currBuff);
this.buildSpillRetBufferNumbers--;
}
numSpillFiles++;
spillInBytes += numBuffersFreed * segmentSize;
// The bloomFilter is built after the data is spilled, so that we can use enough memory.
p.buildBloomFilterAndFreeBucket();
return largestPartNum;
}
boolean applyCondition(BinaryRow candidate) throws Exception {
BinaryRow buildKey = buildSideProjection.apply(candidate);
// They come from Projection, so we can make sure it is in byte[].
boolean equal = buildKey.getSizeInBytes() == probeKey.getSizeInBytes()
&& BinaryRowUtil.byteArrayEquals(
buildKey.getMemorySegment().getHeapMemory(),
probeKey.getMemorySegment().getHeapMemory(),
buildKey.getSizeInBytes());
// TODO do null filter in advance?
if (!nullSafe) {
equal = equal && !(filterAllNulls ? buildKey.anyNull() : buildKey.anyNull(nullFilterKeys));
}
return condFunc == null ? equal : equal && (reverseJoin ? condFunc.apply(probeRow, candidate)
: condFunc.apply(candidate, probeRow));
}
}