blob: ba3582aa7f9d97917681a462bea3489e1ef60b30 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkState;
/**
* File writer which can write buffers and generate {@link PartitionedFile}. Data is written region
* by region. Before writing a new region, the method {@link PartitionedFileWriter#startNewRegion}
* must be called. After writing all data, the method {@link PartitionedFileWriter#finish} must be
* called to close all opened files and return the target {@link PartitionedFile}.
*/
@NotThreadSafe
public class PartitionedFileWriter implements AutoCloseable {
private static final int MIN_INDEX_BUFFER_SIZE = 50 * PartitionedFile.INDEX_ENTRY_SIZE;
/**
* Number of subpartitions. When writing a buffer, target subpartition must be in this range.
*/
private final int numSubpartitions;
/** Opened data file channel of the target {@link PartitionedFile}. */
private final FileChannel dataFileChannel;
/** Opened index file channel of the target {@link PartitionedFile}. */
private final FileChannel indexFileChannel;
/** Data file path of the target {@link PartitionedFile}. */
private final Path dataFilePath;
/** Index file path of the target {@link PartitionedFile}. */
private final Path indexFilePath;
/** Offset in the data file for each subpartition in the current region. */
private final long[] subpartitionOffsets;
/** Data size written in bytes for each subpartition in the current region. */
private final long[] subpartitionBytes;
/** Maximum number of bytes can be used to buffer index entries. */
private final int maxIndexBufferSize;
/** A piece of unmanaged memory for caching of region index entries. */
private ByteBuffer indexBuffer;
/** Whether all index entries are cached in the index buffer or not. */
private boolean allIndexEntriesCached = true;
/** Number of bytes written to the target {@link PartitionedFile}. */
private long totalBytesWritten;
/** Number of regions written to the target {@link PartitionedFile}. */
private int numRegions;
/** Total number of buffers in the data file. */
private long numBuffers;
/** Current subpartition to write buffers to. */
private int currentSubpartition = -1;
/**
* Broadcast region is an optimization for the broadcast partition which writes the same data to
* all subpartitions. For a broadcast region, data is only written once and the indexes of all
* subpartitions point to the same offset in the data file.
*/
private boolean isBroadcastRegion;
/** Whether this file writer is finished or not. */
private boolean isFinished;
/** Whether this file writer is closed or not. */
private boolean isClosed;
public PartitionedFileWriter(int numSubpartitions, int maxIndexBufferSize, String basePath)
throws IOException {
this(numSubpartitions, MIN_INDEX_BUFFER_SIZE, maxIndexBufferSize, basePath);
}
@VisibleForTesting
PartitionedFileWriter(
int numSubpartitions, int minIndexBufferSize, int maxIndexBufferSize, String basePath)
throws IOException {
checkArgument(numSubpartitions > 0, "Illegal number of subpartitions.");
checkArgument(maxIndexBufferSize > 0, "Illegal maximum index cache size.");
checkArgument(basePath != null, "Base path must not be null.");
this.numSubpartitions = numSubpartitions;
this.maxIndexBufferSize = alignMaxIndexBufferSize(maxIndexBufferSize);
this.subpartitionOffsets = new long[numSubpartitions];
this.subpartitionBytes = new long[numSubpartitions];
this.dataFilePath = new File(basePath + PartitionedFile.DATA_FILE_SUFFIX).toPath();
this.indexFilePath = new File(basePath + PartitionedFile.INDEX_FILE_SUFFIX).toPath();
this.indexBuffer = ByteBuffer.allocate(minIndexBufferSize);
BufferReaderWriterUtil.configureByteBuffer(indexBuffer);
this.dataFileChannel = openFileChannel(dataFilePath);
try {
this.indexFileChannel = openFileChannel(indexFilePath);
} catch (Throwable throwable) {
// ensure that the data file channel is closed if any exception occurs
IOUtils.closeQuietly(dataFileChannel);
IOUtils.deleteFileQuietly(dataFilePath);
throw throwable;
}
}
private FileChannel openFileChannel(Path path) throws IOException {
return FileChannel.open(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
}
private int alignMaxIndexBufferSize(int maxIndexBufferSize) {
return maxIndexBufferSize
/ PartitionedFile.INDEX_ENTRY_SIZE
* PartitionedFile.INDEX_ENTRY_SIZE;
}
/**
* Persists the region index of the current data region and starts a new region to write.
*
* <p>Note: The caller is responsible for releasing the failed {@link PartitionedFile} if any
* exception occurs.
*
* @param isBroadcastRegion Whether it's a broadcast region. See {@link #isBroadcastRegion}.
*/
public void startNewRegion(boolean isBroadcastRegion) throws IOException {
checkState(!isFinished, "File writer is already finished.");
checkState(!isClosed, "File writer is already closed.");
writeRegionIndex();
this.isBroadcastRegion = isBroadcastRegion;
}
private void writeIndexEntry(long subpartitionOffset, long numBytes) throws IOException {
if (!indexBuffer.hasRemaining()) {
if (!extendIndexBufferIfPossible()) {
flushIndexBuffer();
indexBuffer.clear();
allIndexEntriesCached = false;
}
}
indexBuffer.putLong(subpartitionOffset);
indexBuffer.putLong(numBytes);
}
private boolean extendIndexBufferIfPossible() {
if (indexBuffer.capacity() >= maxIndexBufferSize) {
return false;
}
int newIndexBufferSize = Math.min(maxIndexBufferSize, 2 * indexBuffer.capacity());
ByteBuffer newIndexBuffer = ByteBuffer.allocate(newIndexBufferSize);
indexBuffer.flip();
newIndexBuffer.put(indexBuffer);
BufferReaderWriterUtil.configureByteBuffer(newIndexBuffer);
indexBuffer = newIndexBuffer;
return true;
}
private void writeRegionIndex() throws IOException {
if (Arrays.stream(subpartitionBytes).sum() > 0) {
for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
writeIndexEntry(subpartitionOffsets[subpartition], subpartitionBytes[subpartition]);
}
currentSubpartition = -1;
++numRegions;
Arrays.fill(subpartitionBytes, 0);
}
}
private void flushIndexBuffer() throws IOException {
indexBuffer.flip();
if (indexBuffer.limit() > 0) {
BufferReaderWriterUtil.writeBuffer(indexFileChannel, indexBuffer);
}
}
/**
* Writes a list of {@link Buffer}s to this {@link PartitionedFile}. It guarantees that after
* the return of this method, the target buffers can be released. In a data region, all data of
* the same subpartition must be written together.
*
* <p>Note: The caller is responsible for recycling the target buffers and releasing the failed
* {@link PartitionedFile} if any exception occurs.
*/
public void writeBuffers(List<BufferWithSubpartition> bufferWithSubpartitions)
throws IOException {
checkState(!isFinished, "File writer is already finished.");
checkState(!isClosed, "File writer is already closed.");
if (bufferWithSubpartitions.isEmpty()) {
return;
}
numBuffers += bufferWithSubpartitions.size();
long expectedBytes;
ByteBuffer[] bufferWithHeaders = new ByteBuffer[2 * bufferWithSubpartitions.size()];
if (isBroadcastRegion) {
expectedBytes = collectBroadcastBuffers(bufferWithSubpartitions, bufferWithHeaders);
} else {
expectedBytes = collectUnicastBuffers(bufferWithSubpartitions, bufferWithHeaders);
}
totalBytesWritten += expectedBytes;
BufferReaderWriterUtil.writeBuffers(dataFileChannel, expectedBytes, bufferWithHeaders);
}
private long collectUnicastBuffers(
List<BufferWithSubpartition> bufferWithSubpartitions, ByteBuffer[] bufferWithHeaders) {
long expectedBytes = 0;
long fileOffset = totalBytesWritten;
for (int i = 0; i < bufferWithSubpartitions.size(); i++) {
int subpartition = bufferWithSubpartitions.get(i).getSubpartitionIndex();
if (subpartition != currentSubpartition) {
checkState(
subpartitionBytes[subpartition] == 0,
"Must write data of the same subpartition together.");
subpartitionOffsets[subpartition] = fileOffset;
currentSubpartition = subpartition;
}
Buffer buffer = bufferWithSubpartitions.get(i).getBuffer();
int numBytes = setBufferWithHeader(buffer, bufferWithHeaders, 2 * i);
expectedBytes += numBytes;
fileOffset += numBytes;
subpartitionBytes[subpartition] += numBytes;
}
return expectedBytes;
}
private long collectBroadcastBuffers(
List<BufferWithSubpartition> bufferWithSubpartitions, ByteBuffer[] bufferWithHeaders) {
// set the file offset of all subpartitions as the current file size on the first call
if (subpartitionBytes[0] == 0) {
for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
subpartitionOffsets[subpartition] = totalBytesWritten;
}
}
long expectedBytes = 0;
for (int i = 0; i < bufferWithSubpartitions.size(); i++) {
Buffer buffer = bufferWithSubpartitions.get(i).getBuffer();
int numBytes = setBufferWithHeader(buffer, bufferWithHeaders, 2 * i);
expectedBytes += numBytes;
}
for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
subpartitionBytes[subpartition] += expectedBytes;
}
return expectedBytes;
}
private int setBufferWithHeader(Buffer buffer, ByteBuffer[] bufferWithHeaders, int index) {
ByteBuffer header = BufferReaderWriterUtil.allocatedHeaderBuffer();
BufferReaderWriterUtil.setByteChannelBufferHeader(buffer, header);
bufferWithHeaders[index] = header;
bufferWithHeaders[index + 1] = buffer.getNioBufferReadable();
return header.remaining() + buffer.readableBytes();
}
/**
* Finishes writing the {@link PartitionedFile} which closes the file channel and returns the
* corresponding {@link PartitionedFile}.
*
* <p>Note: The caller is responsible for releasing the failed {@link PartitionedFile} if any
* exception occurs.
*/
public PartitionedFile finish() throws IOException {
checkState(!isFinished, "File writer is already finished.");
checkState(!isClosed, "File writer is already closed.");
isFinished = true;
writeRegionIndex();
flushIndexBuffer();
indexBuffer.rewind();
long dataFileSize = dataFileChannel.size();
long indexFileSize = indexFileChannel.size();
close();
ByteBuffer indexEntryCache = null;
if (allIndexEntriesCached) {
indexEntryCache = indexBuffer;
}
indexBuffer = null;
return new PartitionedFile(
numRegions,
numSubpartitions,
dataFilePath,
indexFilePath,
dataFileSize,
indexFileSize,
numBuffers,
indexEntryCache);
}
/** Used to close and delete the failed {@link PartitionedFile} when any exception occurs. */
public void releaseQuietly() {
IOUtils.closeQuietly(this);
IOUtils.deleteFileQuietly(dataFilePath);
IOUtils.deleteFileQuietly(indexFilePath);
}
@Override
public void close() throws IOException {
if (isClosed) {
return;
}
isClosed = true;
IOException exception = null;
try {
dataFileChannel.close();
} catch (IOException ioException) {
exception = ioException;
}
try {
indexFileChannel.close();
} catch (IOException ioException) {
exception = ExceptionUtils.firstOrSuppressed(ioException, exception);
}
if (exception != null) {
throw exception;
}
}
}