blob: 15afd9e595b80d5ce3c8b295637737c14a4e6639 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.join.batch.hashtable;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.operators.util.BitSet;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.runtime.util.RowIterator;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import java.io.IOException;
import java.util.ArrayList;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Iterate all the elements in memory which has not been(or has been) probed during probe phase.
*/
public class BuildSideIterator implements RowIterator<BinaryRow> {
private final BinaryRowSerializer accessor;
private final ArrayList<BinaryHashPartition> partitionsBeingBuilt;
private final BitSet probedSet;
private final boolean matchedOrUnmatched;
private int areaIndex;
private BinaryRow reuse;
private BucketIterator bucketIterator;
BuildSideIterator(
BinaryRowSerializer accessor,
BinaryRow reuse,
ArrayList<BinaryHashPartition> partitionsBeingBuilt,
BitSet probedSet,
boolean matchedOrUnmatched) {
this.accessor = accessor;
this.partitionsBeingBuilt = partitionsBeingBuilt;
this.probedSet = probedSet;
this.reuse = reuse;
this.matchedOrUnmatched = matchedOrUnmatched;
this.areaIndex = -1;
}
@Override
public boolean advanceNext() {
if (bucketIterator != null && bucketIterator.advanceNext()) {
return true;
} else {
areaIndex++;
while (areaIndex < partitionsBeingBuilt.size()) {
BinaryHashPartition partition = partitionsBeingBuilt.get(areaIndex);
if (partition.isInMemory()) {
BucketIterator iterator = new BucketIterator(partition.bucketArea, accessor, reuse, probedSet, matchedOrUnmatched);
if (iterator.advanceNext()) {
bucketIterator = iterator;
return true;
}
}
areaIndex++;
}
return false;
}
}
@Override
public BinaryRow getRow() {
return bucketIterator.getRow();
}
/**
* Partition bucket iterator.
*/
public static class BucketIterator implements RowIterator<BinaryRow> {
private BinaryHashBucketArea area;
private final BinaryRowSerializer accessor;
private final BitSet probedSet;
private final boolean matchedOrUnmatched;
private MemorySegment bucketSegment;
private MemorySegment[] overflowSegments;
private int scanCount;
private int bucketInSegmentOffset;
private int pointerOffset;
private int countInBucket;
private int numInBucket;
private BinaryRow reuse;
private BinaryRow instance;
BucketIterator(
BinaryHashBucketArea area,
BinaryRowSerializer accessor,
BinaryRow reuse,
BitSet probedSet,
boolean matchedOrUnmatched) {
this.area = area;
this.accessor = accessor;
this.probedSet = probedSet;
this.reuse = reuse;
this.matchedOrUnmatched = matchedOrUnmatched;
scanCount = -1;
moveToNextBucket();
}
@Override
public boolean advanceNext() {
// search unprobed record in bucket, while none found move to next bucket and search.
while (true) {
this.instance = nextInBucket(reuse);
if (instance == null) {
// return null while there are no more buckets.
if (!moveToNextBucket()) {
return false;
}
} else {
return true;
}
}
}
@Override
public BinaryRow getRow() {
return instance;
}
/**
* Move to next bucket, return true while move to a on heap bucket, return false while move
* to a spilled bucket
* or there is no more bucket.
*/
private boolean moveToNextBucket() {
scanCount++;
if (scanCount >= area.numBuckets) {
return false;
}
// move to next bucket, update all the current bucket status with new bucket information.
final int bucketArrayPos = scanCount >> area.table.bucketsPerSegmentBits;
final int currentBucketInSegmentOffset = (scanCount & area.table.bucketsPerSegmentMask) << BinaryHashBucketArea.BUCKET_SIZE_BITS;
MemorySegment currentBucket = area.buckets[bucketArrayPos];
setBucket(currentBucket, area.overflowSegments, currentBucketInSegmentOffset);
return true;
}
// update current bucket status.
private void setBucket(MemorySegment bucket, MemorySegment[] overflowSegments,
int bucketInSegmentOffset) {
this.bucketSegment = bucket;
this.overflowSegments = overflowSegments;
this.bucketInSegmentOffset = bucketInSegmentOffset;
this.pointerOffset = bucketInSegmentOffset + BinaryHashBucketArea.BUCKET_POINTER_START_OFFSET;
this.countInBucket = bucket.getShort(bucketInSegmentOffset + BinaryHashBucketArea.HEADER_COUNT_OFFSET);
this.numInBucket = 0;
// reset probedSet with probedFlags offset in this bucket.
this.probedSet.setMemorySegment(bucketSegment, this.bucketInSegmentOffset + BinaryHashBucketArea.PROBED_FLAG_OFFSET);
}
private BinaryRow nextInBucket(BinaryRow reuse) {
// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
while (countInBucket != 0) {
checkNotNull(bucketSegment);
while (this.numInBucket < this.countInBucket) {
boolean probed = probedSet.get(numInBucket);
numInBucket++;
if (matchedOrUnmatched == probed) {
try {
this.area.partition.setReadPosition(bucketSegment.getInt(pointerOffset));
reuse = this.accessor.mapFromPages(reuse, this.area.partition);
this.pointerOffset += BinaryHashBucketArea.POINTER_LEN;
return reuse;
} catch (IOException ioex) {
throw new RuntimeException("Error deserializing key or value from the hashtable: " +
ioex.getMessage(), ioex);
}
} else {
this.pointerOffset += BinaryHashBucketArea.POINTER_LEN;
}
}
// this segment is done. check if there is another chained bucket
final int forwardPointer = this.bucketSegment.getInt(this.bucketInSegmentOffset + BinaryHashBucketArea.HEADER_FORWARD_OFFSET);
if (forwardPointer == BinaryHashBucketArea.BUCKET_FORWARD_POINTER_NOT_SET) {
return null;
}
final int overflowSegIndex = forwardPointer >>> area.table.segmentSizeBits;
this.bucketSegment = this.overflowSegments[overflowSegIndex];
this.bucketInSegmentOffset = forwardPointer & area.table.segmentSizeMask;
this.pointerOffset = bucketInSegmentOffset + BinaryHashBucketArea.BUCKET_POINTER_START_OFFSET;
this.countInBucket = this.bucketSegment.getShort(this.bucketInSegmentOffset + BinaryHashBucketArea.HEADER_COUNT_OFFSET);
this.numInBucket = 0;
// reset probedSet with probedFlags offset in this bucket.
this.probedSet.setMemorySegment(bucketSegment, this.bucketInSegmentOffset + BinaryHashBucketArea.PROBED_FLAG_OFFSET);
}
return null;
}
}
}