blob: d582ac7cfae6aff24f172b40c4b193193fa0695c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.sort;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.RandomAccessInputView;
import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.apache.flink.util.MutableObjectIterator;
import java.io.IOException;
import java.util.ArrayList;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* In memory KV sortable buffer for binary row, it already has records in memory.
*/
public class BinaryKVInMemorySortBuffer extends BinaryIndexedSortable {
private final BinaryRowSerializer valueSerializer;
public static BinaryKVInMemorySortBuffer createBuffer(
NormalizedKeyComputer normalizedKeyComputer,
BinaryRowSerializer keySerializer,
BinaryRowSerializer valueSerializer,
RecordComparator comparator,
ArrayList<MemorySegment> recordBufferSegments,
long numElements,
MemorySegmentPool pool) throws IOException {
BinaryKVInMemorySortBuffer sorter = new BinaryKVInMemorySortBuffer(
normalizedKeyComputer, keySerializer, valueSerializer, comparator,
recordBufferSegments, pool);
sorter.load(numElements, sorter.recordBuffer);
return sorter;
}
private BinaryKVInMemorySortBuffer(
NormalizedKeyComputer normalizedKeyComputer,
BinaryRowSerializer keySerializer,
BinaryRowSerializer valueSerializer,
RecordComparator comparator,
ArrayList<MemorySegment> recordBufferSegments,
MemorySegmentPool memorySegmentPool) throws IOException {
super(normalizedKeyComputer, keySerializer, comparator, recordBufferSegments, memorySegmentPool);
this.valueSerializer = valueSerializer;
}
@Override
public void writeToOutput(AbstractPagedOutputView output) throws IOException {
final int numRecords = this.numRecords;
int currentMemSeg = 0;
int currentRecord = 0;
while (currentRecord < numRecords) {
final MemorySegment currentIndexSegment = this.sortIndex.get(currentMemSeg++);
// go through all records in the memory segment
for (int offset = 0; currentRecord < numRecords && offset <= this.lastIndexEntryOffset; currentRecord++,
offset += this.indexEntrySize) {
final long pointer = currentIndexSegment.getLong(offset);
this.recordBuffer.setReadPosition(pointer);
this.serializer.copyFromPagesToView(this.recordBuffer, output);
this.valueSerializer.copyFromPagesToView(this.recordBuffer, output);
}
}
}
public boolean load(long numElements, RandomAccessInputView recordInputView) throws IOException {
for (int index = 0; index < numElements; index++) {
serializer.checkSkipRead(recordInputView);
long pointer = recordInputView.getReadPosition();
BinaryRow row = serializer1.mapFromPages(row1, recordInputView);
valueSerializer.checkSkipRead(recordInputView);
recordInputView.skipBytes(recordInputView.readInt());
boolean success = checkNextIndexOffset();
checkArgument(success);
writeIndexAndNormalizedKey(row, pointer);
}
return true;
}
/**
* Gets an iterator over all KV records in this buffer in their logical order.
*
* @return An iterator returning the records in their logical order.
*/
public final MutableObjectIterator<Tuple2<BinaryRow, BinaryRow>> getIterator() {
return new MutableObjectIterator<Tuple2<BinaryRow, BinaryRow>>() {
private final int size = size();
private int current = 0;
private int currentSegment = 0;
private int currentOffset = 0;
private MemorySegment currentIndexSegment = sortIndex.get(0);
@Override
public Tuple2<BinaryRow, BinaryRow> next(Tuple2<BinaryRow, BinaryRow> kv) {
if (this.current < this.size) {
this.current++;
if (this.currentOffset > lastIndexEntryOffset) {
this.currentOffset = 0;
this.currentIndexSegment = sortIndex.get(++this.currentSegment);
}
long pointer = this.currentIndexSegment.getLong(this.currentOffset);
this.currentOffset += indexEntrySize;
try {
return getRecordFromBuffer(kv.f0, kv.f1, pointer);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
} else {
return null;
}
}
@Override
public Tuple2<BinaryRow, BinaryRow> next() {
throw new RuntimeException("Not support!");
}
};
}
private Tuple2<BinaryRow, BinaryRow> getRecordFromBuffer(
BinaryRow reuseKey, BinaryRow reuseValue, long pointer) throws IOException {
this.recordBuffer.setReadPosition(pointer);
reuseKey = this.serializer.mapFromPages(reuseKey, this.recordBuffer);
reuseValue = this.serializer.mapFromPages(reuseValue, this.recordBuffer);
return Tuple2.of(reuseKey, reuseValue);
}
}