blob: f27cae5c2e323377cd53a27bcb1bf4ee1651a698 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.util;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemorySegmentSource;
import org.apache.flink.runtime.io.disk.RandomAccessInputView;
import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
import org.apache.flink.runtime.memory.AbstractPagedInputView;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.table.api.types.InternalType;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.dataformat.util.BinaryRowUtil;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* Bytes based hash table.
* It can be used for performing aggregations where the aggregated values are fixed-width, for
* example.
* The memory is divided into two areas:
* <p/>
* - Bucket area: this contains: pointer + hashcode.
* Bytes 0 to 8: a pointer to the record in the record area
* Bytes 8 to 16: a holds key's full 32-bit hashcode
* <p/>
* - Record area: this contains the actual data in linked list records.
* A BytesHashMap's record has four parts:
* Bytes 0 to 4: len(k)
* Bytes 4 to 4 + len(k): key data
* Bytes 4 + len(k) to 8 + len(k): len(v)
* Bytes 8 + len(k) to 8 + len(k) + len(v): value data
*
* <p>{@code BytesHashMap} are influenced by Apache Spark BytesToBytesMap.
*/
public class BytesHashMap {
private static final Logger LOG = LoggerFactory.getLogger(BytesHashMap.class);
public static final int BUCKET_SIZE = 16;
public static final int RECORD_EXTRA_LENGTH = 8;
private static final int BUCKET_SIZE_BITS = 4;
private static final int ELEMENT_POINT_LENGTH = 8;
private static final long END_OF_LIST = Long.MAX_VALUE;
private static final int STEP_INCREMENT = 1;
private static final double LOAD_FACTOR = 0.75;
//a smaller bucket can make the best of l1/l2/l3 cache.
private static final long INIT_BUCKET_MEMORY_IN_BYTES = 1024 * 1024L;
private final Object owner;
private final int numBucketsPerSegment;
private final int numBucketsPerSegmentBits;
private final int numBucketsPerSegmentMask;
private final int lastBucketPosition;
private final int segmentSize;
/**
* The segments where the actual data is stored.
*/
private final RecordArea recordArea;
/**
* Set true when valueTypeInfos.length == 0. Usually in this case the BytesHashMap will be used as a HashSet.
* The value from {@link BytesHashMap#append(LookupInfo info, BinaryRow value)}
* will be ignored when hashSetMode set.
* The reusedValue will always point to a 16 bytes long MemorySegment acted as
* each BytesHashMap entry's value part when appended to make the BytesHashMap's spilling work compatible.
*/
private final boolean hashSetMode;
/**
* Used to serialize hash map key and value into RecordArea's MemorySegments.
*/
private final BinaryRowSerializer valueSerializer;
private final BinaryRowSerializer keySerializer;
/**
* Used as a reused object which lookup returned.
*/
private final LookupInfo reuseLookInfo;
private final MemoryManager memoryManager;
/**
* Used as a reused object when retrieve the map's value by key and iteration.
*/
private BinaryRow reusedValue;
/**
* Used as a reused object when lookup and iteration.
*/
private BinaryRow reusedKey;
private final List<MemorySegment> freeMemorySegments;
private List<MemorySegment> bucketSegments;
private long numElements = 0;
private int numBucketsMask;
//get the second hashcode based log2NumBuckets and numBucketsMask2
private int log2NumBuckets;
private int numBucketsMask2;
/**
* The map will be expanded once the number of elements exceeds this threshold.
*/
private int growthThreshold;
private volatile RecordArea.DestructiveEntryIterator destructiveIterator = null;
private final int reservedNumBuffers;
private final int preferredNumBuffers;
private final int perRequestNumBuffers;
private final int initBucketSegmentNum;
private int allocatedFloatingNum;
//metric
private long numSpillFiles;
private long spillInBytes;
public BytesHashMap(
final Object owner,
MemoryManager memoryManager,
long minMemorySize,
InternalType[] keyTypes,
InternalType[] valueTypes) {
this(owner, memoryManager, minMemorySize, minMemorySize, 0, keyTypes, valueTypes);
}
public BytesHashMap(
final Object owner,
MemoryManager memoryManager,
long minMemorySize,
long maxMemorySize,
long eachRequestMemorySize,
InternalType[] keyTypes,
InternalType[] valueTypes) {
this(owner, memoryManager, minMemorySize, maxMemorySize, eachRequestMemorySize, keyTypes, valueTypes, false);
}
public BytesHashMap(
final Object owner,
MemoryManager memoryManager,
long minMemorySize,
long maxMemorySize,
long eachRequestMemorySize,
InternalType[] keyTypes,
InternalType[] valueTypes,
boolean inferBucketMemory) {
this.owner = owner;
this.segmentSize = memoryManager.getPageSize();
this.reservedNumBuffers = (int) (minMemorySize / segmentSize);
this.preferredNumBuffers = (int) (maxMemorySize / segmentSize);
this.perRequestNumBuffers = (int) (eachRequestMemorySize / segmentSize);
this.memoryManager = memoryManager;
try {
this.freeMemorySegments = memoryManager.allocatePages(owner, reservedNumBuffers);
} catch (MemoryAllocationException e) {
throw new IllegalArgumentException("BytesHashMap can't allocate " + reservedNumBuffers + " pages", e);
}
this.numBucketsPerSegment = segmentSize / BUCKET_SIZE;
this.numBucketsPerSegmentBits = MathUtils.log2strict(this.numBucketsPerSegment);
this.numBucketsPerSegmentMask = (1 << this.numBucketsPerSegmentBits) - 1;
this.lastBucketPosition = (numBucketsPerSegment - 1) * BUCKET_SIZE;
checkArgument(keyTypes.length > 0);
this.keySerializer = new BinaryRowSerializer(keyTypes);
this.reusedKey = this.keySerializer.createInstance();
if (valueTypes.length == 0) {
this.valueSerializer = new BinaryRowSerializer();
this.hashSetMode = true;
this.reusedValue = new BinaryRow(0);
this.reusedValue.pointTo(MemorySegmentFactory.wrap(new byte[8]), 0, 8);
LOG.info("BytesHashMap with hashSetMode = true.");
} else {
this.valueSerializer = new BinaryRowSerializer(valueTypes);
this.hashSetMode = false;
this.reusedValue = this.valueSerializer.createInstance();
}
this.reuseLookInfo = new LookupInfo();
this.recordArea = new RecordArea();
if (inferBucketMemory) {
this.initBucketSegmentNum = calcNumBucketSegments(keyTypes, valueTypes);
} else {
checkArgument(minMemorySize > INIT_BUCKET_MEMORY_IN_BYTES, "The minBucketMemorySize is not valid!");
this.initBucketSegmentNum = MathUtils.roundDownToPowerOf2((int) (INIT_BUCKET_MEMORY_IN_BYTES / segmentSize));
}
// allocate and initialize MemorySegments for bucket area
initBucketSegments(initBucketSegmentNum);
LOG.info("BytesHashMap with initial memory segments {}, {} in bytes, init allocating {} for bucket area. And " +
"the preferred memory segments is {} in bytes, per request {} segments from floating memory pool.",
reservedNumBuffers, reservedNumBuffers * segmentSize, initBucketSegmentNum,
preferredNumBuffers * segmentSize, perRequestNumBuffers);
}
private int calcNumBucketSegments(InternalType[] keyTypes, InternalType[] valueTypes) {
int calcRecordLength = reusedValue.getFixedLengthPartSize() + BinaryRowUtil.getVariableLength(valueTypes) +
reusedKey.getFixedLengthPartSize() + BinaryRowUtil.getVariableLength(keyTypes);
// We aim for a 200% utilization of the bucket table.
double averageBucketSize = BUCKET_SIZE / LOAD_FACTOR;
double fraction = averageBucketSize / (averageBucketSize + calcRecordLength + RECORD_EXTRA_LENGTH);
// We make the number of buckets a power of 2 so that taking modulo is efficient.
// To avoid rehash as far as possible, here use roundUpToPowerOfTwo firstly
int ret = Math.max(1, MathUtils.roundDownToPowerOf2((int) (reservedNumBuffers * fraction)));
// We can't handle more than Integer.MAX_VALUE buckets (eg. because hash functions return int)
if ((long) ret * numBucketsPerSegment > Integer.MAX_VALUE) {
ret = MathUtils.roundDownToPowerOf2(Integer.MAX_VALUE / numBucketsPerSegment);
}
return ret;
}
// ----------------------- Public interface -----------------------
/**
* @return true when BytesHashMap's valueTypeInfos.length == 0.
* Any appended value will be ignored and replaced with a reusedValue as a present tag.
*/
public boolean isHashSetMode() {
return hashSetMode;
}
/**
* @param key by which looking up the value in the hash map.
* Only support the key in the BinaryRow form who has only one MemorySegment.
* @return {@link LookupInfo}
*/
public LookupInfo lookup(BinaryRow key) {
// check the looking up key having only one memory segment
checkArgument(key.getAllSegments().length == 1);
final int hashCode1 = key.hashCode();
int newPos = hashCode1 & numBucketsMask;
// which segment contains the bucket
int bucketSegmentIndex = newPos >>> numBucketsPerSegmentBits;
// offset of the bucket in the segment
int bucketOffset = (newPos & numBucketsPerSegmentMask) << BUCKET_SIZE_BITS;
boolean found = false;
int step = STEP_INCREMENT;
long hashCode2 = 0;
long findElementPtr;
try {
do {
findElementPtr = bucketSegments.get(bucketSegmentIndex).getLong(bucketOffset);
if (findElementPtr == END_OF_LIST) {
// This is a new key.
break;
} else {
final int storedHashCode = bucketSegments.get(bucketSegmentIndex).getInt(
bucketOffset + ELEMENT_POINT_LENGTH);
if (hashCode1 == storedHashCode) {
recordArea.setReadPosition(findElementPtr);
if (recordArea.readKeyAndEquals(key)) {
// we found an element with a matching key, and not just a hash collision
found = true;
reusedValue = recordArea.readValue(reusedValue);
break;
}
}
}
if (step == 1) {
hashCode2 = calcSecondHashCode(hashCode1);
}
newPos = (int) ((hashCode1 + step * hashCode2) & numBucketsMask);
// which segment contains the bucket
bucketSegmentIndex = newPos >>> numBucketsPerSegmentBits;
// offset of the bucket in the segment
bucketOffset = (newPos & numBucketsPerSegmentMask) << BUCKET_SIZE_BITS;
step += STEP_INCREMENT;
} while (true);
} catch (IOException ex) {
throw new RuntimeException(
"Error reading record from the aggregate map: " + ex.getMessage(), ex);
}
reuseLookInfo.set(
found, hashCode1, key, reusedValue, bucketSegmentIndex, bucketOffset);
return reuseLookInfo;
}
// M(the num of buckets) is the nth power of 2, so the second hash code must be odd, and always is
// H2(K) = 1 + 2 * ((H1(K)/M) mod (M-1))
private long calcSecondHashCode(final int firstHashCode) {
return ((((long) (firstHashCode >> log2NumBuckets)) & numBucketsMask2) << 1) + 1L;
}
/**
* Append an value into the hash map's record area.
*
* @return An BinaryRow mapping to the memory segments in the map's record area belonging to
* the newly appended value.
* @throws EOFException if the map can't allocate much more memory.
*/
public BinaryRow append(LookupInfo info, BinaryRow value) throws IOException {
try {
if (numElements >= growthThreshold) {
growAndRehash();
//update info's bucketSegmentIndex and bucketOffset
lookup(info.key);
}
BinaryRow toAppend = hashSetMode ? reusedValue : value;
long pointerToAppended = recordArea.appendRecord(info.key, toAppend);
bucketSegments.get(info.bucketSegmentIndex).putLong(info.bucketOffset, pointerToAppended);
bucketSegments.get(info.bucketSegmentIndex).putInt(
info.bucketOffset + ELEMENT_POINT_LENGTH, info.keyHashCode);
numElements++;
recordArea.setReadPosition(pointerToAppended);
recordArea.skipKey();
return recordArea.readValue(reusedValue);
} catch (EOFException e) {
numSpillFiles++;
spillInBytes += recordArea.segments.size() * ((long) segmentSize);
throw e;
}
}
public long getNumSpillFiles() {
return numSpillFiles;
}
public long getUsedMemoryInBytes() {
return (bucketSegments.size() + recordArea.segments.size()) * ((long) segmentSize);
}
public long getSpillInBytes() {
return spillInBytes;
}
public long getNumElements() {
return numElements;
}
private void initBucketSegments(int numBucketSegments) {
if (numBucketSegments < 1) {
throw new RuntimeException("Too small memory allocated for BytesHashMap");
}
this.bucketSegments = new ArrayList<>(numBucketSegments);
for (int i = 0; i < numBucketSegments; i++) {
bucketSegments.add(i, freeMemorySegments.remove(freeMemorySegments.size() - 1));
}
resetBucketSegments(this.bucketSegments);
int numBuckets = numBucketSegments * numBucketsPerSegment;
this.log2NumBuckets = MathUtils.log2strict(numBuckets);
this.numBucketsMask = (1 << MathUtils.log2strict(numBuckets)) - 1;
this.numBucketsMask2 = (1 << MathUtils.log2strict(numBuckets >> 1)) - 1;
this.growthThreshold = (int) (numBuckets * LOAD_FACTOR);
}
private List<MemorySegment> allocateSegments(int required) throws MemoryAllocationException {
if (reservedNumBuffers + allocatedFloatingNum + required >= preferredNumBuffers) {
//the owner's allocated memory will exceed the max memory size.
throw new MemoryAllocationException("Could not allocate " + required + " pages limited to the" +
" max memory size. ");
} else {
List<MemorySegment> memorySegments = memoryManager.allocatePages(owner, required, false);
if (memorySegments.size() != required) {
throw new MemoryAllocationException(
"Could not allocate " + required + " pages limited to the max memory size. ");
}
LOG.info("{} allocate {} floating segments successfully!", owner, required);
allocatedFloatingNum += memorySegments.size();
return memorySegments;
}
}
private void resetBucketSegments(List<MemorySegment> resetBucketSegs) {
for (MemorySegment segment: resetBucketSegs) {
for (int j = 0; j <= lastBucketPosition; j += BUCKET_SIZE) {
segment.putLong(j, END_OF_LIST);
}
}
}
/**
* @throws IOException if the map can't allocate much more memory.
*/
private void growAndRehash() throws EOFException {
// allocate the new data structures
int required = 2 * bucketSegments.size();
if (required * numBucketsPerSegment > Integer.MAX_VALUE) {
LOG.warn("We can't handle more than Integer.MAX_VALUE buckets (eg. because hash functions return int)");
throw new EOFException();
}
List<MemorySegment> newBucketSegments = new ArrayList<>(required);
try {
int freeNumSegments = freeMemorySegments.size();
int numAllocatedSegments = required - freeNumSegments;
if (numAllocatedSegments > 0) {
List<MemorySegment> returnSegments = allocateSegments(numAllocatedSegments);
newBucketSegments.addAll(returnSegments);
returnSegments.clear();
}
int needNumFromFreeSegments = required - newBucketSegments.size();
for (int end = needNumFromFreeSegments; end > 0; end--) {
newBucketSegments.add(freeMemorySegments.remove(freeMemorySegments.size() - 1));
}
int numBuckets = newBucketSegments.size() * numBucketsPerSegment;
this.log2NumBuckets = MathUtils.log2strict(numBuckets);
this.numBucketsMask = (1 << MathUtils.log2strict(numBuckets)) - 1;
this.numBucketsMask2 = (1 << MathUtils.log2strict(numBuckets >> 1)) - 1;
this.growthThreshold = (int) (numBuckets * LOAD_FACTOR);
} catch (MemoryAllocationException e) {
LOG.warn("BytesHashMap can't allocate {} pages, and now used {} pages", required, reservedNumBuffers +
allocatedFloatingNum, e);
throw new EOFException();
}
long reHashStartTime = System.currentTimeMillis();
resetBucketSegments(newBucketSegments);
// Re-mask (we don't recompute the hashcode because we stored all 32 bits of it)
for (MemorySegment memorySegment : bucketSegments) {
for (int j = 0; j < numBucketsPerSegment; j++) {
final long recordPointer = memorySegment.getLong(j * BUCKET_SIZE);
if (recordPointer != END_OF_LIST) {
final int hashCode1 = memorySegment.getInt(j * BUCKET_SIZE + ELEMENT_POINT_LENGTH);
int newPos = hashCode1 & numBucketsMask;
int bucketSegmentIndex = newPos >>> numBucketsPerSegmentBits;
int bucketOffset = (newPos & numBucketsPerSegmentMask) << BUCKET_SIZE_BITS;
int step = STEP_INCREMENT;
long hashCode2 = 0;
while (newBucketSegments.get(bucketSegmentIndex).getLong(bucketOffset) != END_OF_LIST) {
if (step == 1) {
hashCode2 = calcSecondHashCode(hashCode1);
}
newPos = (int) ((hashCode1 + step * hashCode2) & numBucketsMask);
bucketSegmentIndex = newPos >>> numBucketsPerSegmentBits;
bucketOffset = (newPos & numBucketsPerSegmentMask) << BUCKET_SIZE_BITS;
step += STEP_INCREMENT;
}
newBucketSegments.get(bucketSegmentIndex).putLong(bucketOffset, recordPointer);
newBucketSegments.get(bucketSegmentIndex).putInt(bucketOffset + ELEMENT_POINT_LENGTH, hashCode1);
}
}
}
LOG.info("The rehash take {} ms for {} segments", (System.currentTimeMillis() - reHashStartTime), required);
this.freeMemorySegments.addAll(this.bucketSegments);
this.bucketSegments = newBucketSegments;
}
/**
* Returns a destructive iterator for iterating over the entries of this map. It frees each page
* as it moves onto next one. Notice: it is illegal to call any method on the map after
* `destructiveIterator()` has been called.
* @return an entry iterator for iterating the value appended in the hash map.
*/
public MutableObjectIterator<Entry> getEntryIterator() {
if (destructiveIterator != null) {
throw new IllegalArgumentException("DestructiveIterator is not null, so this method can't be invoke!");
}
return recordArea.destructiveEntryIterator();
}
/**
* @return the underlying memory segments of the hash map's record area
*/
public ArrayList<MemorySegment> getRecordAreaMemorySegments() {
return recordArea.segments;
}
public List<MemorySegment> getBucketAreaMemorySegments() {
return bucketSegments;
}
public List<MemorySegment> getFreeMemorySegments() {
return freeMemorySegments;
}
/**
* release the map's record and bucket area's memory segments.
*/
public void free() {
free(false);
}
/**
* @param reservedFixedMemmory reserved fixed memory or not.
*/
public void free(boolean reservedFixedMemmory) {
returnSegments(this.bucketSegments);
this.bucketSegments.clear();
recordArea.release();
releaseFloatingMemory();
if (!reservedFixedMemmory) {
memoryManager.release(freeMemorySegments);
}
numElements = 0;
destructiveIterator = null;
}
/**
* reset the map's record and bucket area's memory segments for reusing.
*/
public void reset() {
int numBuckets = bucketSegments.size() * numBucketsPerSegment;
this.log2NumBuckets = MathUtils.log2strict(numBuckets);
this.numBucketsMask = (1 << MathUtils.log2strict(numBuckets)) - 1;
this.numBucketsMask2 = (1 << MathUtils.log2strict(numBuckets >> 1)) - 1;
this.growthThreshold = (int) (numBuckets * LOAD_FACTOR);
//reset the record segments.
recordArea.reset();
resetBucketSegments(bucketSegments);
numElements = 0;
destructiveIterator = null;
LOG.info("reset BytesHashMap with record memory segments {}, {} in bytes, init allocating {} for bucket area.",
freeMemorySegments.size(), freeMemorySegments.size() * segmentSize, bucketSegments.size());
}
private void returnSegment(MemorySegment segments) {
freeMemorySegments.add(segments);
}
private void returnSegments(List<MemorySegment> segments) {
freeMemorySegments.addAll(segments);
}
private void releaseFloatingMemory() {
memoryManager.release(freeMemorySegments, false);
allocatedFloatingNum = 0;
}
// ----------------------- Record Area -----------------------
private final class RecordArea {
private final ArrayList<MemorySegment> segments = new ArrayList<>();
private final RandomAccessInputView inView;
private final SimpleCollectingOutputView outView;
RecordArea() {
this.outView = new SimpleCollectingOutputView(segments, new RecordAreaMemorySource(), segmentSize);
this.inView = new RandomAccessInputView(segments, segmentSize);
}
void release() {
returnSegments(segments);
segments.clear();
}
void reset() {
release();
// request a new memory segment from freeMemorySegments
// reset segmentNum and positionInSegment
outView.reset();
inView.setReadPosition(0);
}
// ----------------------- Memory Management -----------------------
private final class RecordAreaMemorySource implements MemorySegmentSource {
@Override
public MemorySegment nextSegment() {
int s = freeMemorySegments.size();
if (s > 0) {
return freeMemorySegments.remove(s - 1);
} else if (reservedNumBuffers + allocatedFloatingNum < preferredNumBuffers) {
int requestNum = Math.min(perRequestNumBuffers, preferredNumBuffers - allocatedFloatingNum -
reservedNumBuffers);
try {
List<MemorySegment> allocateSegs = allocateSegments(requestNum);
freeMemorySegments.addAll(allocateSegs);
allocateSegs.clear();
return freeMemorySegments.remove(freeMemorySegments.size() - 1);
} catch (MemoryAllocationException e) {
LOG.warn("BytesHashMap can't allocate {} pages, and now used {} pages", perRequestNumBuffers,
reservedNumBuffers + allocatedFloatingNum, e);
return null;
}
} else {
return null;
}
}
}
// ----------------------- Append -----------------------
private long appendRecord(BinaryRow key, BinaryRow value) throws IOException {
final long oldLastPosition = outView.getCurrentOffset();
// serialize the key into the BytesHashMap record area
int skip = keySerializer.serializeToPages(key, outView);
// serialize the value into the BytesHashMap record area
valueSerializer.serializeToPages(value, outView);
return oldLastPosition + skip;
}
// ----------------------- Read -----------------------
void setReadPosition(long position) {
inView.setReadPosition(position);
}
boolean readKeyAndEquals(BinaryRow lookup) throws IOException {
reusedKey = keySerializer.mapFromPages(reusedKey, inView);
return lookup.equals(reusedKey);
}
/**
* @return the pointer to the skipped key's starting position in the record area
* @throws IOException when invalid memory address visited.
*/
long skipKey() throws IOException {
long index = inView.getReadPosition();
inView.skipBytes(inView.readInt());
return index;
}
BinaryRow readValue(BinaryRow reuse) throws IOException {
// depends on BinaryRowSerializer to check writing skip
// and to find the real start offset of the data
return valueSerializer.mapFromPages(reuse, inView);
}
// ----------------------- Iterator -----------------------
MutableObjectIterator<Entry> destructiveEntryIterator() {
return new RecordArea.DestructiveEntryIterator();
}
final class DestructiveEntryIterator extends AbstractPagedInputView implements MutableObjectIterator<Entry> {
private int count = 0;
private int currentSegmentIndex = 0;
public DestructiveEntryIterator() {
super(segments.get(0), segmentSize, 0);
destructiveIterator = this;
}
public boolean hasNext() {
return count < numElements;
}
@Override
public Entry next(Entry reuse) throws IOException {
if (hasNext()) {
count++;
//segment already is useless any more.
keySerializer.mapFromPages(reuse.getKey(), this);
valueSerializer.mapFromPages(reuse.getValue(), this);
return reuse;
} else {
return null;
}
}
@Override
public Entry next() throws IOException {
throw new UnsupportedOperationException("");
}
@Override
protected int getLimitForSegment(MemorySegment segment) {
return segmentSize;
}
@Override
protected MemorySegment nextSegment(MemorySegment current) throws EOFException, IOException {
return segments.get(++currentSegmentIndex);
}
}
}
/**
* Handle returned by {@link BytesHashMap#lookup(BinaryRow)} function.
*/
public static final class LookupInfo {
private boolean found;
private BinaryRow key;
private BinaryRow value;
/**
* The hashcode of the look up key passed to {@link BytesHashMap#lookup(BinaryRow)},
* Caching this hashcode here allows us to avoid re-hashing the key when inserting a value
* for that key.
* The same purpose with bucketSegmentIndex, bucketOffset.
*/
private int keyHashCode;
private int bucketSegmentIndex;
private int bucketOffset;
LookupInfo() {
this.found = false;
this.keyHashCode = -1;
this.key = null;
this.value = null;
this.bucketSegmentIndex = -1;
this.bucketOffset = -1;
}
void set(
boolean found,
int keyHashCode,
BinaryRow key,
BinaryRow value,
int bucketSegmentIndex, int bucketOffset) {
this.found = found;
this.keyHashCode = keyHashCode;
this.key = key;
this.value = value;
this.bucketSegmentIndex = bucketSegmentIndex;
this.bucketOffset = bucketOffset;
}
public boolean isFound() {
return found;
}
public BinaryRow getValue() {
return value;
}
}
/**
* BytesHashMap Entry contains key and value field.
*/
public static final class Entry {
private final BinaryRow key;
private final BinaryRow value;
public Entry(BinaryRow key, BinaryRow value) {
this.key = key;
this.value = value;
}
public BinaryRow getKey() {
return key;
}
public BinaryRow getValue() {
return value;
}
}
}