blob: 7c2d18477dc27c1931ab78fca6037318153d06a2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.sort;
import org.apache.flink.api.common.io.blockcompression.BlockCompressionFactory;
import org.apache.flink.api.common.io.blockcompression.BlockCompressionFactoryLoader;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.operators.sort.IndexedSorter;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.table.api.TableConfigOptions;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.runtime.util.AbstractChannelWriterOutputView;
import org.apache.flink.table.runtime.util.ChannelWithMeta;
import org.apache.flink.table.runtime.util.FileChannelUtil;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Sorter for buffered input in the form of Key-Value Style.
* First, sort and spill buffered inputs.
* Second, merge disk outputs and return iterator.
*/
public class BufferedKVExternalSorter {
private static final Logger LOG = LoggerFactory.getLogger(BufferedKVExternalSorter.class);
private volatile boolean closed = false;
private final NormalizedKeyComputer nKeyComputer;
private final RecordComparator comparator;
private final BinaryRowSerializer keySerializer;
private final BinaryRowSerializer valueSerializer;
private final IndexedSorter sorter;
private final BinaryKVExternalMerger merger;
private final IOManager ioManager;
private final int maxNumFileHandles;
private final FileIOChannel.Enumerator enumerator;
private final List<ChannelWithMeta> channelIDs = new ArrayList<>();
private final SpillChannelManager channelManager;
private int pageSize;
//metric
private long numSpillFiles;
private long spillInBytes;
private long spillInCompressedBytes;
private final boolean compressionEnable;
private final BlockCompressionFactory compressionCodecFactory;
private final int compressionBlockSize;
public BufferedKVExternalSorter(
IOManager ioManager,
BinaryRowSerializer keySerializer,
BinaryRowSerializer valueSerializer,
NormalizedKeyComputer nKeyComputer,
RecordComparator comparator,
int pageSize,
Configuration conf) throws IOException {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.nKeyComputer = nKeyComputer;
this.comparator = comparator;
this.pageSize = pageSize;
this.sorter = new QuickSort();
this.maxNumFileHandles = conf.getInteger(TableConfigOptions.SQL_EXEC_SORT_FILE_HANDLES_MAX_NUM);
this.compressionEnable = conf.getBoolean(TableConfigOptions.SQL_EXEC_SPILL_COMPRESSION_ENABLED);
this.compressionCodecFactory = this.compressionEnable
? BlockCompressionFactoryLoader.createBlockCompressionFactory(conf.getString(
TableConfigOptions.SQL_EXEC_SPILL_COMPRESSION_CODEC), conf)
: null;
this.compressionBlockSize = conf.getInteger(TableConfigOptions.SQL_EXEC_SPILL_COMPRESSION_BLOCK_SIZE);
this.ioManager = ioManager;
this.enumerator = this.ioManager.createChannelEnumerator();
this.channelManager = new SpillChannelManager();
this.merger = new BinaryKVExternalMerger(
ioManager, pageSize,
maxNumFileHandles, channelManager,
keySerializer, valueSerializer, comparator,
compressionEnable,
compressionCodecFactory,
compressionBlockSize);
}
public MutableObjectIterator<Tuple2<BinaryRow, BinaryRow>> getKVIterator() throws IOException {
// 1. merge if more than maxNumFile
// merge channels until sufficient file handles are available
List<ChannelWithMeta> channelIDs = this.channelIDs;
while (!closed && channelIDs.size() > this.maxNumFileHandles) {
channelIDs = merger.mergeChannelList(channelIDs);
}
// 2. final merge
List<FileIOChannel> openChannels = new ArrayList<>();
BinaryMergeIterator<Tuple2<BinaryRow, BinaryRow>> iterator =
merger.getMergingIterator(channelIDs, openChannels);
channelManager.addOpenChannels(openChannels);
return iterator;
}
public void sortAndSpill(
ArrayList<MemorySegment> recordBufferSegments,
long numElements,
MemorySegmentPool pool) throws IOException {
// 1. sort buffer
BinaryKVInMemorySortBuffer buffer =
BinaryKVInMemorySortBuffer.createBuffer(
nKeyComputer, keySerializer, valueSerializer, comparator,
recordBufferSegments, numElements, pool);
this.sorter.sort(buffer);
// 2. spill
FileIOChannel.ID channel = enumerator.next();
channelManager.addChannel(channel);
AbstractChannelWriterOutputView output = null;
int bytesInLastBuffer;
int blockCount;
try {
numSpillFiles++;
output = FileChannelUtil.createOutputView(ioManager, channel, compressionEnable,
compressionCodecFactory, compressionBlockSize, pageSize);
buffer.writeToOutput(output);
spillInBytes += output.getNumBytes();
spillInCompressedBytes += output.getNumCompressedBytes();
bytesInLastBuffer = output.close();
blockCount = output.getBlockCount();
LOG.info("here spill the {}th kv external buffer data with {} bytes and {} compressed bytes",
numSpillFiles, spillInBytes, spillInCompressedBytes);
} catch (IOException e) {
if (output != null) {
output.closeAndDelete();
}
throw e;
}
channelIDs.add(new ChannelWithMeta(channel, blockCount, bytesInLastBuffer));
}
public void close() {
if (closed) {
return;
}
// mark as closed
closed = true;
merger.close();
channelManager.close();
}
}