blob: e01416ca81e65d77e63fcfeb116fce1863acd625 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.operators.hash;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.Channel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.util.MutableObjectIterator;
public class ReOpenableMutableHashTable<BT, PT> extends MutableHashTable<BT, PT> {
/**
* Channel for the spilled partitions
*/
private final Channel.Enumerator spilledInMemoryPartitions;
/**
* Stores the initial partitions and a list of the files that contain the spilled contents
*/
private List<HashPartition<BT, PT>> initialPartitions;
/**
* The values of these variables are stored here after the initial open()
* Required to restore the initial state before each additional probe phase.
*/
private int initialBucketCount;
private byte initialPartitionFanOut;
private boolean spilled = false;
public ReOpenableMutableHashTable(TypeSerializer<BT> buildSideSerializer,
TypeSerializer<PT> probeSideSerializer,
TypeComparator<BT> buildSideComparator,
TypeComparator<PT> probeSideComparator,
TypePairComparator<PT, BT> comparator,
List<MemorySegment> memorySegments, IOManager ioManager) {
super(buildSideSerializer, probeSideSerializer, buildSideComparator,
probeSideComparator, comparator, memorySegments, ioManager);
keepBuildSidePartitions = true;
spilledInMemoryPartitions = ioManager.createChannelEnumerator();
}
@Override
public void open(MutableObjectIterator<BT> buildSide,
MutableObjectIterator<PT> probeSide) throws IOException {
super.open(buildSide, probeSide);
initialPartitions = new ArrayList<HashPartition<BT, PT>>( partitionsBeingBuilt );
initialPartitionFanOut = (byte) partitionsBeingBuilt.size();
initialBucketCount = this.numBuckets;
}
public void reopenProbe(MutableObjectIterator<PT> probeInput) throws IOException {
if (this.closed.get()) {
throw new IllegalStateException("Cannot open probe input because hash join has already been closed");
}
partitionsBeingBuilt.clear();
probeIterator = new ProbeIterator<PT>(probeInput, probeSideSerializer.createInstance());
// We restore the same "partitionsBeingBuild" state as after the initial open call.
partitionsBeingBuilt.addAll(initialPartitions);
if (spilled) {
this.currentRecursionDepth = 0;
initTable(initialBucketCount, initialPartitionFanOut);
//setup partitions for insertion:
for (int i = 0; i < this.partitionsBeingBuilt.size(); i++) {
ReOpenableHashPartition<BT, PT> part = (ReOpenableHashPartition<BT, PT>) this.partitionsBeingBuilt.get(i);
if (part.isInMemory()) {
ensureNumBuffersReturned(part.initialPartitionBuffersCount);
part.restorePartitionBuffers(ioManager, availableMemory);
// now, index the partition through a hash table
final HashPartition<BT, PT>.PartitionIterator pIter = part.getPartitionIterator(this.buildSideComparator);
BT record = this.buildSideSerializer.createInstance();
while ((record = pIter.next(record)) != null) {
final int hashCode = hash(pIter.getCurrentHashCode(), 0);
final int posHashCode = hashCode % initialBucketCount;
final long pointer = pIter.getPointer();
// get the bucket for the given hash code
final int bucketArrayPos = posHashCode >> this.bucketsPerSegmentBits;
final int bucketInSegmentPos = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
final MemorySegment bucket = this.buckets[bucketArrayPos];
insertBucketEntry(part, bucket, bucketInSegmentPos, hashCode, pointer);
}
} else {
this.writeBehindBuffersAvailable--; // we are not in-memory, thus the probe side buffer will grab one wbb.
if (this.writeBehindBuffers.size() == 0) { // prepareProbePhase always requires one buffer in the writeBehindBuffers-Queue.
this.writeBehindBuffers.add(getNextBuffer());
this.writeBehindBuffersAvailable++;
}
part.prepareProbePhase(ioManager,currentEnumerator,writeBehindBuffers);
}
}
// spilled partitions are automatically added as pending partitions after in-memory has been handled
} else {
// the build input completely fits into memory, hence everything is still in memory.
for (int partIdx = 0; partIdx < partitionsBeingBuilt.size(); partIdx++) {
final HashPartition<BT, PT> p = partitionsBeingBuilt.get(partIdx);
p.prepareProbePhase(ioManager,currentEnumerator,writeBehindBuffers);
}
}
}
/**
* This method stores the initial hash table's contents on disk if hash join needs the memory
* for further partition processing.
* The initial hash table is rebuild before a new secondary input is opened.
*
* For the sake of simplicity we iterate over all in-memory elements and store them in one file.
* The file is hashed into memory upon opening a new probe input.
* @throws IOException
*/
void storeInitialHashTable() throws IOException {
if (spilled) {
return; // we create the initialHashTable only once. Later calls are caused by deeper recursion lvls
}
spilled = true;
for (int partIdx = 0; partIdx < initialPartitions.size(); partIdx++) {
final ReOpenableHashPartition<BT, PT> p = (ReOpenableHashPartition<BT, PT>) initialPartitions.get(partIdx);
if (p.isInMemory()) { // write memory resident partitions to disk
this.writeBehindBuffersAvailable += p.spillInMemoryPartition(spilledInMemoryPartitions.next(), ioManager, writeBehindBuffers);
}
}
}
@Override
protected boolean prepareNextPartition() throws IOException {
// check if there will be further partition processing.
this.furtherPartitioning = false;
for (int i = 0; i < this.partitionsBeingBuilt.size(); i++) {
final HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(i);
if (!p.isInMemory() && p.getProbeSideRecordCount() != 0) {
furtherPartitioning = true;
break;
}
}
if (furtherPartitioning) {
((ReOpenableMutableHashTable<BT, PT>) this).storeInitialHashTable();
}
return super.prepareNextPartition();
}
@Override
protected void releaseTable() {
if(furtherPartitioning | this.currentRecursionDepth > 0) {
super.releaseTable();
}
}
@Override
protected HashPartition<BT, PT> getNewInMemoryPartition(int number, int recursionLevel) {
return new ReOpenableHashPartition<BT, PT>(this.buildSideSerializer, this.probeSideSerializer,
number, recursionLevel, this.availableMemory.remove(this.availableMemory.size() - 1),
this, this.segmentSize);
}
@Override
public void close() {
if (partitionsBeingBuilt.size() == 0) { // partitions are cleared after the build phase. But we need to drop
// memory with them.
this.partitionsBeingBuilt.addAll(initialPartitions);
}
this.furtherPartitioning = true; // fake, to release table properly (close() will call releaseTable())
super.close();
}
}