blob: 1eb9105d57bd12af05b89a9f40c42a3c5b591a91 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.sort;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* In memory sort buffer for binary row.
*/
public final class BinaryInMemorySortBuffer extends BinaryIndexedSortable {
private static final Logger LOG = LoggerFactory.getLogger(BinaryInMemorySortBuffer.class);
private static final int MIN_REQUIRED_BUFFERS = 3;
private TypeSerializer<BaseRow> inputSerializer;
private final ArrayList<MemorySegment> recordBufferSegments;
private final SimpleCollectingOutputView recordCollector;
private final int totalNumBuffers;
private final List<MemorySegment> segments;
private long currentDataBufferOffset;
private long sortIndexBytes;
private final MemoryManager memoryManager;
/**
* Create a memory sorter in `insert` way.
*/
public static BinaryInMemorySortBuffer createBuffer(
MemoryManager memoryManager,
NormalizedKeyComputer normalizedKeyComputer,
TypeSerializer<BaseRow> inputSerializer,
BinaryRowSerializer serializer,
RecordComparator comparator,
List<MemorySegment> memory,
int additionalLimitNumPages,
int perRequestNumBuffers) throws IOException {
checkArgument(memory.size() >= MIN_REQUIRED_BUFFERS);
int totalNumBuffers = memory.size();
DynamicMemorySegmentPool pool = new DynamicMemorySegmentPool(memoryManager, memory,
perRequestNumBuffers, additionalLimitNumPages);
ArrayList<MemorySegment> recordBufferSegments = new ArrayList<>(16);
return new BinaryInMemorySortBuffer(
memoryManager, memory,
normalizedKeyComputer, inputSerializer, serializer, comparator, recordBufferSegments,
new SimpleCollectingOutputView(recordBufferSegments, pool, pool.pageSize()),
pool, totalNumBuffers);
}
private BinaryInMemorySortBuffer(
MemoryManager memoryManager,
List<MemorySegment> segments,
NormalizedKeyComputer normalizedKeyComputer,
TypeSerializer<BaseRow> inputSerializer,
BinaryRowSerializer serializer,
RecordComparator comparator,
ArrayList<MemorySegment> recordBufferSegments,
SimpleCollectingOutputView recordCollector,
MemorySegmentPool pool,
int totalNumBuffers) throws IOException {
super(normalizedKeyComputer, serializer, comparator, recordBufferSegments, pool);
this.memoryManager = memoryManager;
this.segments = segments;
this.inputSerializer = inputSerializer;
this.recordBufferSegments = recordBufferSegments;
this.recordCollector = recordCollector;
this.totalNumBuffers = totalNumBuffers;
}
// -------------------------------------------------------------------------
// Memory Segment
// -------------------------------------------------------------------------
/**
* Resets the sort buffer back to the state where it is empty. All contained data is discarded.
*/
public void reset() {
// reset all offsets
this.numRecords = 0;
this.currentSortIndexOffset = 0;
this.currentDataBufferOffset = 0;
this.sortIndexBytes = 0;
// return all memory
returnToSegmentPool();
//release floating memory in advance
releaseFloatingMemory();
// grab first buffers
this.currentSortIndexSegment = nextMemorySegment();
this.sortIndex.add(this.currentSortIndexSegment);
this.recordCollector.reset();
}
private void releaseFloatingMemory() {
//release floating memory in advance
int releaseNum = ((DynamicMemorySegmentPool) memorySegmentPool).resetAndReturnFloatingNum();
if (releaseNum > 0) {
int beforeReleaseNum = segments.size();
memoryManager.release(segments, false);
Preconditions.checkArgument(releaseNum == (beforeReleaseNum - segments.size()));
LOG.info("release {} floating pages in advance.", releaseNum);
}
}
public void returnToSegmentPool() {
// return all memory
this.memorySegmentPool.returnAll(this.sortIndex);
this.memorySegmentPool.returnAll(this.recordBufferSegments);
this.sortIndex.clear();
this.recordBufferSegments.clear();
}
/**
* Checks whether the buffer is empty.
*
* @return True, if no record is contained, false otherwise.
*/
public boolean isEmpty() {
return this.numRecords == 0;
}
public void dispose() {
returnToSegmentPool();
//release floating memory in advance
releaseFloatingMemory();
}
public long getCapacity() {
return ((long) this.totalNumBuffers) * memorySegmentPool.pageSize();
}
public long getOccupancy() {
return this.currentDataBufferOffset + this.sortIndexBytes;
}
/**
* Writes a given record to this sort buffer. The written record will be appended and take
* the last logical position.
*
* @param record The record to be written.
* @return True, if the record was successfully written, false, if the sort buffer was full.
* @throws IOException Thrown, if an error occurred while serializing the record into the buffers.
*/
public boolean write(BaseRow record) throws IOException {
//check whether we need a new memory segment for the sort index
if (!checkNextIndexOffset()) {
return false;
}
// serialize the record into the data buffers
int skip;
try {
skip = this.inputSerializer.serializeToPages(record, this.recordCollector);
} catch (EOFException e) {
return false;
}
final long newOffset = this.recordCollector.getCurrentOffset();
long currOffset = currentDataBufferOffset + skip;
writeIndexAndNormalizedKey(record, currOffset);
this.currentDataBufferOffset = newOffset;
return true;
}
private BinaryRow getRecordFromBuffer(BinaryRow reuse, long pointer) throws IOException {
this.recordBuffer.setReadPosition(pointer);
return this.serializer.mapFromPages(reuse, this.recordBuffer);
}
// -------------------------------------------------------------------------
/**
* Gets an iterator over all records in this buffer in their logical order.
*
* @return An iterator returning the records in their logical order.
*/
public final MutableObjectIterator<BinaryRow> getIterator() {
return new MutableObjectIterator<BinaryRow>() {
private final int size = size();
private int current = 0;
private int currentSegment = 0;
private int currentOffset = 0;
private MemorySegment currentIndexSegment = sortIndex.get(0);
@Override
public BinaryRow next(BinaryRow target) {
if (this.current < this.size) {
this.current++;
if (this.currentOffset > lastIndexEntryOffset) {
this.currentOffset = 0;
this.currentIndexSegment = sortIndex.get(++this.currentSegment);
}
long pointer = this.currentIndexSegment.getLong(this.currentOffset);
this.currentOffset += indexEntrySize;
try {
return getRecordFromBuffer(target, pointer);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
} else {
return null;
}
}
@Override
public BinaryRow next() {
throw new RuntimeException("Not support!");
}
};
}
}