blob: 62c65f7cecb77f58aafe5b53e5712d947899961d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.groupby.epinephelinae;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public class ByteBufferHashTable
{
public static int calculateTableArenaSizeWithPerBucketAdditionalSize(
int bufferCapacity,
int bucketSize,
int perBucketAdditionalSize
)
{
return (bufferCapacity / (bucketSize + perBucketAdditionalSize)) * bucketSize;
}
public static int calculateTableArenaSizeWithFixedAdditionalSize(
int bufferCapacity,
int bucketSize,
int fixedAdditionalSize
)
{
return ((bufferCapacity - fixedAdditionalSize) / bucketSize) * bucketSize;
}
protected final int maxSizeForTesting; // Integer.MAX_VALUE in production, only used for unit tests
protected static final int HASH_SIZE = Integer.BYTES;
protected final float maxLoadFactor;
protected final int initialBuckets;
protected final ByteBuffer buffer;
protected final int bucketSizeWithHash;
protected final int tableArenaSize;
protected final int keySize;
protected int tableStart;
// Buffer pointing to the current table (it moves around as the table grows)
protected ByteBuffer tableBuffer;
// Number of elements in the table right now
protected int size;
// Maximum number of elements in the table before it must be resized
// This value changes when the table is resized.
protected int regrowthThreshold;
// current number of available/used buckets in the table
// This value changes when the table is resized.
protected int maxBuckets;
// how many times the table buffer has filled/readjusted (through adjustTableWhenFull())
protected int growthCount;
@Nullable
protected BucketUpdateHandler bucketUpdateHandler;
public ByteBufferHashTable(
float maxLoadFactor,
int initialBuckets,
int bucketSizeWithHash,
ByteBuffer buffer,
int keySize,
int maxSizeForTesting,
@Nullable BucketUpdateHandler bucketUpdateHandler
)
{
this.maxLoadFactor = maxLoadFactor;
this.initialBuckets = initialBuckets;
this.bucketSizeWithHash = bucketSizeWithHash;
this.buffer = buffer;
this.keySize = keySize;
this.maxSizeForTesting = maxSizeForTesting;
this.tableArenaSize = buffer.capacity();
this.bucketUpdateHandler = bucketUpdateHandler;
}
public void reset()
{
size = 0;
maxBuckets = Math.min(tableArenaSize / bucketSizeWithHash, initialBuckets);
regrowthThreshold = maxSizeForBuckets(maxBuckets);
if (maxBuckets < 1) {
throw new IAE(
"Not enough capacity for even one row! Need[%,d] but have[%,d].",
bucketSizeWithHash + Integer.BYTES,
buffer.capacity()
);
}
// Start table part-way through the buffer so the last growth can start from zero and thereby use more space.
tableStart = tableArenaSize - maxBuckets * bucketSizeWithHash;
int nextBuckets = maxBuckets * 2;
while (true) {
long nextBucketsSize = (long) nextBuckets * bucketSizeWithHash;
if (nextBucketsSize > Integer.MAX_VALUE) {
break;
}
final int nextTableStart = tableStart - nextBuckets * bucketSizeWithHash;
if (nextTableStart > tableArenaSize / 2) {
tableStart = nextTableStart;
nextBuckets = nextBuckets * 2;
} else {
break;
}
}
if (tableStart < tableArenaSize / 2) {
tableStart = 0;
}
final ByteBuffer bufferDup = buffer.duplicate();
bufferDup.position(tableStart);
bufferDup.limit(tableStart + maxBuckets * bucketSizeWithHash);
tableBuffer = bufferDup.slice();
// Clear used bits of new table
for (int i = 0; i < maxBuckets; i++) {
tableBuffer.put(i * bucketSizeWithHash, (byte) 0);
}
}
public void adjustTableWhenFull()
{
if (tableStart == 0) {
// tableStart = 0 is the last growth; no further growing is possible.
return;
}
final int newBuckets;
final int newMaxSize;
final int newTableStart;
if (((long) maxBuckets * 3 * bucketSizeWithHash) > (long) tableArenaSize - tableStart) {
// Not enough space to grow upwards, start back from zero
newTableStart = 0;
newBuckets = tableStart / bucketSizeWithHash;
newMaxSize = maxSizeForBuckets(newBuckets);
} else {
newTableStart = tableStart + tableBuffer.limit();
newBuckets = maxBuckets * 2;
newMaxSize = maxSizeForBuckets(newBuckets);
}
if (newBuckets < maxBuckets) {
throw new ISE("newBuckets[%,d] < maxBuckets[%,d]", newBuckets, maxBuckets);
}
ByteBuffer newTableBuffer = buffer.duplicate();
newTableBuffer.position(newTableStart);
newTableBuffer.limit(newTableStart + newBuckets * bucketSizeWithHash);
newTableBuffer = newTableBuffer.slice();
int newSize = 0;
// Clear used bits of new table
for (int i = 0; i < newBuckets; i++) {
newTableBuffer.put(i * bucketSizeWithHash, (byte) 0);
}
// Loop over old buckets and copy to new table
final ByteBuffer entryBuffer = tableBuffer.duplicate();
final ByteBuffer keyBuffer = tableBuffer.duplicate();
int oldBuckets = maxBuckets;
if (bucketUpdateHandler != null) {
bucketUpdateHandler.handlePreTableSwap();
}
for (int oldBucket = 0; oldBucket < oldBuckets; oldBucket++) {
if (isBucketUsed(oldBucket)) {
int oldBucketOffset = oldBucket * bucketSizeWithHash;
entryBuffer.limit((oldBucket + 1) * bucketSizeWithHash);
entryBuffer.position(oldBucketOffset);
keyBuffer.limit(entryBuffer.position() + HASH_SIZE + keySize);
keyBuffer.position(entryBuffer.position() + HASH_SIZE);
final int keyHash = entryBuffer.getInt(entryBuffer.position()) & 0x7fffffff;
final int newBucket = findBucket(true, newBuckets, newTableBuffer, keyBuffer, keyHash);
if (newBucket < 0) {
throw new ISE("Couldn't find a bucket while resizing");
}
final int newBucketOffset = newBucket * bucketSizeWithHash;
newTableBuffer.position(newBucketOffset);
newTableBuffer.put(entryBuffer);
newSize++;
if (bucketUpdateHandler != null) {
bucketUpdateHandler.handleBucketMove(oldBucketOffset, newBucketOffset, tableBuffer, newTableBuffer);
}
}
}
maxBuckets = newBuckets;
regrowthThreshold = newMaxSize;
tableBuffer = newTableBuffer;
tableStart = newTableStart;
growthCount++;
if (size != newSize) {
throw new ISE("size[%,d] != newSize[%,d] after resizing", size, newSize);
}
}
protected void initializeNewBucketKey(
final int bucket,
final ByteBuffer keyBuffer,
final int keyHash
)
{
int offset = bucket * bucketSizeWithHash;
tableBuffer.position(offset);
tableBuffer.putInt(Groupers.getUsedFlag(keyHash));
tableBuffer.put(keyBuffer);
size++;
if (bucketUpdateHandler != null) {
bucketUpdateHandler.handleNewBucket(offset);
}
}
/**
* Find a bucket for a key, attempting to grow the table with adjustTableWhenFull() if possible.
*
* @param keyBuffer buffer containing the key
* @param keyHash hash of the key
* @param preTableGrowthRunnable runnable that executes before the table grows
*
* @return bucket number of the found bucket or -1 if a bucket could not be allocated after resizing.
*/
protected int findBucketWithAutoGrowth(
final ByteBuffer keyBuffer,
final int keyHash,
final Runnable preTableGrowthRunnable
)
{
int bucket = findBucket(canAllowNewBucket(), maxBuckets, tableBuffer, keyBuffer, keyHash);
if (bucket < 0) {
if (size < maxSizeForTesting) {
preTableGrowthRunnable.run();
adjustTableWhenFull();
bucket = findBucket(size < regrowthThreshold, maxBuckets, tableBuffer, keyBuffer, keyHash);
}
}
return bucket;
}
/**
* Finds the bucket into which we should insert a key.
*
* @param keyBuffer key, must have exactly keySize bytes remaining. Will not be modified.
* @param targetTableBuffer Need selectable buffer, since when resizing hash table,
* findBucket() is used on the newly allocated table buffer
*
* @return bucket index for this key, or -1 if no bucket is available due to being full
*/
protected int findBucket(
final boolean allowNewBucket,
final int buckets,
final ByteBuffer targetTableBuffer,
final ByteBuffer keyBuffer,
final int keyHash
)
{
// startBucket will never be negative since keyHash is always positive (see Groupers.hash)
final int startBucket = keyHash % buckets;
int bucket = startBucket;
outer:
while (true) {
final int bucketOffset = bucket * bucketSizeWithHash;
if ((targetTableBuffer.get(bucketOffset) & 0x80) == 0) {
// Found unused bucket before finding our key
return allowNewBucket ? bucket : -1;
}
for (int i = bucketOffset + HASH_SIZE, j = keyBuffer.position(); j < keyBuffer.position() + keySize; i++, j++) {
if (targetTableBuffer.get(i) != keyBuffer.get(j)) {
bucket += 1;
if (bucket == buckets) {
bucket = 0;
}
if (bucket == startBucket) {
// Came back around to the start without finding a free slot, that was a long trip!
// Should never happen unless buckets == regrowthThreshold.
return -1;
}
continue outer;
}
}
// Found our key in a used bucket
return bucket;
}
}
protected boolean canAllowNewBucket()
{
return size < Math.min(regrowthThreshold, maxSizeForTesting);
}
protected int getOffsetForBucket(int bucket)
{
return bucket * bucketSizeWithHash;
}
protected int maxSizeForBuckets(int buckets)
{
return Math.max(1, (int) (buckets * maxLoadFactor));
}
protected boolean isBucketUsed(final int bucket)
{
return (tableBuffer.get(bucket * bucketSizeWithHash) & 0x80) == 0x80;
}
protected boolean isOffsetUsed(final int bucketOffset)
{
return (tableBuffer.get(bucketOffset) & 0x80) == 0x80;
}
public ByteBuffer getTableBuffer()
{
return tableBuffer;
}
public int getSize()
{
return size;
}
public int getRegrowthThreshold()
{
return regrowthThreshold;
}
public int getMaxBuckets()
{
return maxBuckets;
}
public int getGrowthCount()
{
return growthCount;
}
public interface BucketUpdateHandler
{
void handleNewBucket(int bucketOffset);
void handlePreTableSwap();
void handleBucketMove(int oldBucketOffset, int newBucketOffset, ByteBuffer oldBuffer, ByteBuffer newBuffer);
}
}