blob: 0a0ab22adc8b2e313db1f10a9970d8a6eb289654 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
* A {@link SortBuffer} implementation which sorts all appended records only by subpartition index.
* Records of the same subpartition keep the appended order.
*
* <p>It maintains a list of {@link MemorySegment}s as a joint buffer. Data will be appended to the
* joint buffer sequentially. When writing a record, an index entry will be appended first. An index
* entry consists of 4 fields: 4 bytes for record length, 4 bytes for {@link DataType} and 8 bytes
* for address pointing to the next index entry of the same channel which will be used to index the
* next record to read when coping data from this {@link SortBuffer}. For simplicity, no index entry
* can span multiple segments. The corresponding record data is seated right after its index entry
* and different from the index entry, records have variable length thus may span multiple segments.
*/
@NotThreadSafe
public class PartitionSortedBuffer implements SortBuffer {
private final Object lock;
/**
* Size of an index entry: 4 bytes for record length, 4 bytes for data type and 8 bytes for
* pointer to next entry.
*/
private static final int INDEX_ENTRY_SIZE = 4 + 4 + 8;
/** A buffer pool to request memory segments from. */
private final BufferPool bufferPool;
/** A segment list as a joint buffer which stores all records and index entries. */
@GuardedBy("lock")
private final ArrayList<MemorySegment> buffers = new ArrayList<>();
/** Addresses of the first record's index entry for each subpartition. */
private final long[] firstIndexEntryAddresses;
/** Addresses of the last record's index entry for each subpartition. */
private final long[] lastIndexEntryAddresses;
/** Size of buffers requested from buffer pool. All buffers must be of the same size. */
private final int bufferSize;
// ---------------------------------------------------------------------------------------------
// Statistics and states
// ---------------------------------------------------------------------------------------------
/** Total number of bytes already appended to this sort buffer. */
private long numTotalBytes;
/** Total number of records already appended to this sort buffer. */
private long numTotalRecords;
/** Total number of bytes already read from this sort buffer. */
private long numTotalBytesRead;
/** Whether this sort buffer is finished. One can only read a finished sort buffer. */
private boolean isFinished;
/** Whether this sort buffer is released. A released sort buffer can not be used. */
@GuardedBy("lock")
private boolean isReleased;
// ---------------------------------------------------------------------------------------------
// For writing
// ---------------------------------------------------------------------------------------------
/** Array index in the segment list of the current available buffer for writing. */
private int writeSegmentIndex;
/** Next position in the current available buffer for writing. */
private int writeSegmentOffset;
// ---------------------------------------------------------------------------------------------
// For reading
// ---------------------------------------------------------------------------------------------
/** Data of different subpartitions in this sort buffer will be read in this order. */
private final int[] subpartitionReadOrder;
/** Index entry address of the current record or event to be read. */
private long readIndexEntryAddress;
/** Record bytes remaining after last copy, which must be read first in next copy. */
private int recordRemainingBytes;
/** Used to index the current available channel to read data from. */
private int readOrderIndex = -1;
public PartitionSortedBuffer(
Object lock,
BufferPool bufferPool,
int numSubpartitions,
int bufferSize,
@Nullable int[] customReadOrder) {
checkArgument(bufferSize > INDEX_ENTRY_SIZE, "Buffer size is too small.");
this.lock = checkNotNull(lock);
this.bufferPool = checkNotNull(bufferPool);
this.bufferSize = bufferSize;
this.firstIndexEntryAddresses = new long[numSubpartitions];
this.lastIndexEntryAddresses = new long[numSubpartitions];
// initialized with -1 means the corresponding channel has no data
Arrays.fill(firstIndexEntryAddresses, -1L);
Arrays.fill(lastIndexEntryAddresses, -1L);
this.subpartitionReadOrder = new int[numSubpartitions];
if (customReadOrder != null) {
checkArgument(customReadOrder.length == numSubpartitions, "Illegal data read order.");
System.arraycopy(customReadOrder, 0, this.subpartitionReadOrder, 0, numSubpartitions);
} else {
for (int channel = 0; channel < numSubpartitions; ++channel) {
this.subpartitionReadOrder[channel] = channel;
}
}
}
@Override
public boolean append(ByteBuffer source, int targetChannel, DataType dataType)
throws IOException {
checkArgument(source.hasRemaining(), "Cannot append empty data.");
checkState(!isFinished, "Sort buffer is already finished.");
checkState(!isReleased, "Sort buffer is already released.");
int totalBytes = source.remaining();
// return false directly if it can not allocate enough buffers for the given record
if (!allocateBuffersForRecord(totalBytes)) {
return false;
}
// write the index entry and record or event data
writeIndex(targetChannel, totalBytes, dataType);
writeRecord(source);
++numTotalRecords;
numTotalBytes += totalBytes;
return true;
}
private void writeIndex(int channelIndex, int numRecordBytes, Buffer.DataType dataType) {
MemorySegment segment = buffers.get(writeSegmentIndex);
// record length takes the high 32 bits and data type takes the low 32 bits
segment.putLong(writeSegmentOffset, ((long) numRecordBytes << 32) | dataType.ordinal());
// segment index takes the high 32 bits and segment offset takes the low 32 bits
long indexEntryAddress = ((long) writeSegmentIndex << 32) | writeSegmentOffset;
long lastIndexEntryAddress = lastIndexEntryAddresses[channelIndex];
lastIndexEntryAddresses[channelIndex] = indexEntryAddress;
if (lastIndexEntryAddress >= 0) {
// link the previous index entry of the given channel to the new index entry
segment = buffers.get(getSegmentIndexFromPointer(lastIndexEntryAddress));
segment.putLong(
getSegmentOffsetFromPointer(lastIndexEntryAddress) + 8, indexEntryAddress);
} else {
firstIndexEntryAddresses[channelIndex] = indexEntryAddress;
}
// move the write position forward so as to write the corresponding record
updateWriteSegmentIndexAndOffset(INDEX_ENTRY_SIZE);
}
private void writeRecord(ByteBuffer source) {
while (source.hasRemaining()) {
MemorySegment segment = buffers.get(writeSegmentIndex);
int toCopy = Math.min(bufferSize - writeSegmentOffset, source.remaining());
segment.put(writeSegmentOffset, source, toCopy);
// move the write position forward so as to write the remaining bytes or next record
updateWriteSegmentIndexAndOffset(toCopy);
}
}
private boolean allocateBuffersForRecord(int numRecordBytes) throws IOException {
int numBytesRequired = INDEX_ENTRY_SIZE + numRecordBytes;
int availableBytes =
writeSegmentIndex == buffers.size() ? 0 : bufferSize - writeSegmentOffset;
// return directly if current available bytes is adequate
if (availableBytes >= numBytesRequired) {
return true;
}
// skip the remaining free space if the available bytes is not enough for an index entry
if (availableBytes < INDEX_ENTRY_SIZE) {
updateWriteSegmentIndexAndOffset(availableBytes);
availableBytes = 0;
}
// allocate exactly enough buffers for the appended record
do {
MemorySegment segment = requestBufferFromPool();
if (segment == null) {
// return false if we can not allocate enough buffers for the appended record
return false;
}
availableBytes += bufferSize;
addBuffer(segment);
} while (availableBytes < numBytesRequired);
return true;
}
private void addBuffer(MemorySegment segment) {
synchronized (lock) {
if (segment.size() != bufferSize) {
bufferPool.recycle(segment);
throw new IllegalStateException("Illegal memory segment size.");
}
if (isReleased) {
bufferPool.recycle(segment);
throw new IllegalStateException("Sort buffer is already released.");
}
buffers.add(segment);
}
}
private MemorySegment requestBufferFromPool() throws IOException {
try {
// blocking request buffers if there is still guaranteed memory
if (buffers.size() < bufferPool.getNumberOfRequiredMemorySegments()) {
return bufferPool.requestBufferBuilderBlocking().getMemorySegment();
}
} catch (InterruptedException e) {
throw new IOException("Interrupted while requesting buffer.");
}
BufferBuilder buffer = bufferPool.requestBufferBuilder();
return buffer != null ? buffer.getMemorySegment() : null;
}
private void updateWriteSegmentIndexAndOffset(int numBytes) {
writeSegmentOffset += numBytes;
// using the next available free buffer if the current is full
if (writeSegmentOffset == bufferSize) {
++writeSegmentIndex;
writeSegmentOffset = 0;
}
}
@Override
public BufferWithChannel copyIntoSegment(MemorySegment target) {
checkState(hasRemaining(), "No data remaining.");
checkState(isFinished, "Should finish the sort buffer first before coping any data.");
checkState(!isReleased, "Sort buffer is already released.");
int numBytesCopied = 0;
DataType bufferDataType = DataType.DATA_BUFFER;
int channelIndex = subpartitionReadOrder[readOrderIndex];
do {
int sourceSegmentIndex = getSegmentIndexFromPointer(readIndexEntryAddress);
int sourceSegmentOffset = getSegmentOffsetFromPointer(readIndexEntryAddress);
MemorySegment sourceSegment = buffers.get(sourceSegmentIndex);
long lengthAndDataType = sourceSegment.getLong(sourceSegmentOffset);
int length = getSegmentIndexFromPointer(lengthAndDataType);
DataType dataType = DataType.values()[getSegmentOffsetFromPointer(lengthAndDataType)];
// return the data read directly if the next to read is an event
if (dataType.isEvent() && numBytesCopied > 0) {
break;
}
bufferDataType = dataType;
// get the next index entry address and move the read position forward
long nextReadIndexEntryAddress = sourceSegment.getLong(sourceSegmentOffset + 8);
sourceSegmentOffset += INDEX_ENTRY_SIZE;
// allocate a temp buffer for the event if the target buffer is not big enough
if (bufferDataType.isEvent() && target.size() < length) {
target = MemorySegmentFactory.allocateUnpooledSegment(length);
}
numBytesCopied +=
copyRecordOrEvent(
target,
numBytesCopied,
sourceSegmentIndex,
sourceSegmentOffset,
length);
if (recordRemainingBytes == 0) {
// move to next channel if the current channel has been finished
if (readIndexEntryAddress == lastIndexEntryAddresses[channelIndex]) {
updateReadChannelAndIndexEntryAddress();
break;
}
readIndexEntryAddress = nextReadIndexEntryAddress;
}
} while (numBytesCopied < target.size() && bufferDataType.isBuffer());
numTotalBytesRead += numBytesCopied;
Buffer buffer = new NetworkBuffer(target, (buf) -> {}, bufferDataType, numBytesCopied);
return new BufferWithChannel(buffer, channelIndex);
}
private int copyRecordOrEvent(
MemorySegment targetSegment,
int targetSegmentOffset,
int sourceSegmentIndex,
int sourceSegmentOffset,
int recordLength) {
if (recordRemainingBytes > 0) {
// skip the data already read if there is remaining partial record after the previous
// copy
long position = (long) sourceSegmentOffset + (recordLength - recordRemainingBytes);
sourceSegmentIndex += (position / bufferSize);
sourceSegmentOffset = (int) (position % bufferSize);
} else {
recordRemainingBytes = recordLength;
}
int targetSegmentSize = targetSegment.size();
int numBytesToCopy =
Math.min(targetSegmentSize - targetSegmentOffset, recordRemainingBytes);
do {
// move to next data buffer if all data of the current buffer has been copied
if (sourceSegmentOffset == bufferSize) {
++sourceSegmentIndex;
sourceSegmentOffset = 0;
}
int sourceRemainingBytes =
Math.min(bufferSize - sourceSegmentOffset, recordRemainingBytes);
int numBytes = Math.min(targetSegmentSize - targetSegmentOffset, sourceRemainingBytes);
MemorySegment sourceSegment = buffers.get(sourceSegmentIndex);
sourceSegment.copyTo(sourceSegmentOffset, targetSegment, targetSegmentOffset, numBytes);
recordRemainingBytes -= numBytes;
targetSegmentOffset += numBytes;
sourceSegmentOffset += numBytes;
} while ((recordRemainingBytes > 0 && targetSegmentOffset < targetSegmentSize));
return numBytesToCopy;
}
private void updateReadChannelAndIndexEntryAddress() {
// skip the channels without any data
while (++readOrderIndex < firstIndexEntryAddresses.length) {
int channelIndex = subpartitionReadOrder[readOrderIndex];
if ((readIndexEntryAddress = firstIndexEntryAddresses[channelIndex]) >= 0) {
break;
}
}
}
private int getSegmentIndexFromPointer(long value) {
return (int) (value >>> 32);
}
private int getSegmentOffsetFromPointer(long value) {
return (int) (value);
}
@Override
public long numRecords() {
return numTotalRecords;
}
@Override
public long numBytes() {
return numTotalBytes;
}
@Override
public boolean hasRemaining() {
return numTotalBytesRead < numTotalBytes;
}
@Override
public void finish() {
checkState(!isFinished, "SortBuffer is already finished.");
isFinished = true;
// prepare for reading
updateReadChannelAndIndexEntryAddress();
}
@Override
public boolean isFinished() {
return isFinished;
}
@Override
public void release() {
// the sort buffer can be released by other threads
synchronized (lock) {
if (isReleased) {
return;
}
isReleased = true;
for (MemorySegment segment : buffers) {
bufferPool.recycle(segment);
}
buffers.clear();
numTotalBytes = 0;
numTotalRecords = 0;
}
}
@Override
public boolean isReleased() {
synchronized (lock) {
return isReleased;
}
}
}