blob: 12f083557e3b2e3e1847a82f7cb9d164e9833262 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.table.runtime.sort;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.sort.ExceptionHandler;
import org.apache.flink.runtime.operators.sort.IndexedSorter;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.runtime.operators.sort.SortedDataFile;
import org.apache.flink.runtime.operators.sort.Sorter;
import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
import org.apache.flink.table.api.TableConfigOptions;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.runtime.util.AbstractChannelWriterOutputView;
import org.apache.flink.table.runtime.util.ChannelWithMeta;
import org.apache.flink.table.runtime.util.FileChannelUtil;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
* The {@link BinaryExternalSorter} is a full fledged sorter. It implements a multi-way merge sort.
* Internally, it has two asynchronous threads (sort, spill) which communicate through a set of
* blocking circularQueues, forming a closed loop. Memory is allocated using the
* {@link MemoryManager} interface. Thus the component will not exceed the provided memory limits.
public class BinaryExternalSorter implements Sorter<BinaryRow> {
// ------------------------------------------------------------------------
// Constants
// ------------------------------------------------------------------------
/** The minimum number of segments that are required for the sort to operate. */
protected static final int MIN_NUM_SORT_MEM_SEGMENTS = 10;
public static final long SORTER_MIN_NUM_SORT_MEM =
/** Logging. */
private static final Logger LOG = LoggerFactory.getLogger(BinaryExternalSorter.class);
// ------------------------------------------------------------------------
// Threads
// ------------------------------------------------------------------------
* The currWriteBuffer that is passed as marker for the end of data.
private static final CircularElement EOF_MARKER = new CircularElement();
* The currWriteBuffer that is passed as marker for signaling the beginning of spilling.
private static final CircularElement SPILLING_MARKER = new CircularElement();
* The ChannelWithMeta that is passed as marker for signaling the final merge.
private static final ChannelWithMeta FINAL_MERGE_MARKER = new ChannelWithMeta(null, -1, -1);
// ------------------------------------------------------------------------
// Memory
// ------------------------------------------------------------------------
* The memory segments used first for sorting and later for reading/pre-fetching
* during the external merge.
protected final List<List<MemorySegment>> sortReadMemory;
* Records all sort buffer.
protected final List<BinaryInMemorySortBuffer> sortBuffers;
protected final int fixedReadMemoryNum;
/** The memory manager through which memory is allocated and released. */
protected final MemoryManager memoryManager;
// ------------------------------------------------------------------------
// Miscellaneous Fields
// ------------------------------------------------------------------------
* The monitor which guards the iterator field.
protected final Object iteratorLock = new Object();
/** The thread that merges the buffer handed from the reading thread. */
private ThreadBase sortThread;
/** The thread that handles spilling to secondary storage. */
private ThreadBase spillThread;
/** The thread that handles merging from the secondary storage. */
private ThreadBase mergeThread;
* Final result iterator.
protected volatile MutableObjectIterator<BinaryRow> iterator;
* The exception that is set, if the iterator cannot be created.
protected volatile IOException iteratorException;
* Flag indicating that the sorter was closed.
protected volatile boolean closed;
* Sort or spill thread maybe occur some exceptions.
private ExceptionHandler<IOException> exceptionHandler;
* Queue for the communication between the threads.
private CircularQueues circularQueues;
private long bytesUntilSpilling;
private CircularElement currWriteBuffer;
private boolean writingDone = false;
private final Object writeLock = new Object();
private final SpillChannelManager channelManager;
private final BinaryExternalMerger merger;
private final int memorySegmentSize;
private final boolean compressionEnable;
private final BlockCompressionFactory compressionCodecFactory;
private final int compressionBlockSize;
private final boolean asyncMergeEnable;
// ------------------------------------------------------------------------
// Constructor & Shutdown
// ------------------------------------------------------------------------
private final BinaryRowSerializer serializer;
private long numSpillFiles;
private long spillInBytes;
private long spillInCompressedBytes;
public BinaryExternalSorter(
final Object owner, MemoryManager memoryManager, long reservedMemorySize, long maxMemorySize,
long perRequestMemorySize, IOManager ioManager, TypeSerializer<BaseRow> inputSerializer,
BinaryRowSerializer serializer, NormalizedKeyComputer normalizedKeyComputer,
RecordComparator comparator, Configuration conf) throws IOException {
this(owner, memoryManager, reservedMemorySize, maxMemorySize, perRequestMemorySize, ioManager,
inputSerializer, serializer, normalizedKeyComputer, comparator,
public BinaryExternalSorter(
final Object owner, MemoryManager memoryManager, long reservedMemorySize, long maxMemorySize,
long perRequestMemorySize, IOManager ioManager, TypeSerializer<BaseRow> inputSerializer,
BinaryRowSerializer serializer,
NormalizedKeyComputer normalizedKeyComputer,
RecordComparator comparator, Configuration conf,
float startSpillingFraction) throws IOException {
int maxNumFileHandles = conf.getInteger(TableConfigOptions.SQL_EXEC_SORT_FILE_HANDLES_MAX_NUM);
this.compressionEnable = conf.getBoolean(TableConfigOptions.SQL_EXEC_SPILL_COMPRESSION_ENABLED);
this.compressionCodecFactory = this.compressionEnable
? BlockCompressionFactoryLoader.createBlockCompressionFactory(conf.getString(
: null;
compressionBlockSize = conf.getInteger(TableConfigOptions.SQL_EXEC_SPILL_COMPRESSION_BLOCK_SIZE);
asyncMergeEnable = conf.getBoolean(TableConfigOptions.SQL_EXEC_SORT_ASYNC_MERGE_ENABLED);
checkArgument(maxNumFileHandles >= 2);
this.serializer = (BinaryRowSerializer) serializer.duplicate();
this.memoryManager = checkNotNull(memoryManager);
this.memorySegmentSize = memoryManager.getPageSize();
if (reservedMemorySize < SORTER_MIN_NUM_SORT_MEM) {
throw new IllegalArgumentException("Too little memory provided to sorter to perform task. " +
"Required are at least " + SORTER_MIN_NUM_SORT_MEM +
" pages. Current page size is " + memoryManager.getPageSize() + " bytes.");
// adjust the memory quotas to the page size
final int sortMemPages = (int) (reservedMemorySize / memoryManager.getPageSize());
final long sortMemory = ((long) sortMemPages) * memoryManager.getPageSize();
// decide how many sort buffers to use
int numSortBuffers = 1;
final long sortMaxMemSize = Math.max(maxMemorySize, reservedMemorySize);
if (sortMaxMemSize > 100 * 1024 * 1024L) {
numSortBuffers = 2;
final int numSegmentsPerSortBuffer = sortMemPages / numSortBuffers;
this.sortReadMemory = new ArrayList<>();
List<MemorySegment> readMemory;
try {
readMemory = memoryManager.allocatePages(owner, sortMemPages);
} catch (MemoryAllocationException e) {
LOG.error("Can't allocate {} pages from fixed memory pool.", sortMemPages, e);
throw new RuntimeException(e);
this.fixedReadMemoryNum = readMemory.size();
// circular circularQueues pass buffers between the threads
final CircularQueues circularQueues = new CircularQueues();
// allocate the sort buffers and fill empty queue with them
final Iterator<MemorySegment> segments = readMemory.iterator();
final int perRequestBuffersNum = (int) (perRequestMemorySize / memoryManager.getPageSize());
final int additionalLimitNumPages =
(int) ((maxMemorySize - reservedMemorySize) / memoryManager.getPageSize());
final int eachBufferAdditionalLimitNumPages = (int) (additionalLimitNumPages / numSortBuffers);"BinaryExternalSorter with initial memory segments {},And the preferred memory {} segments, " +
"per request {} segments from floating memory pool, maxNumFileHandles({})," +
" compressionEnable({}), compressionCodecFactory({}), compressionBlockSize({}).", sortMemPages,
(int) (maxMemorySize / memoryManager.getPageSize()), perRequestBuffersNum, maxNumFileHandles,
compressionEnable, compressionEnable ? compressionCodecFactory.getClass() : null, compressionBlockSize);
this.sortBuffers = new ArrayList<>();
for (int i = 0; i < numSortBuffers; i++) {
// grab some memory
final List<MemorySegment> sortSegments = new ArrayList<>(numSegmentsPerSortBuffer);
for (int k = (i == numSortBuffers - 1 ? Integer.MAX_VALUE : numSegmentsPerSortBuffer); k > 0 && segments
.hasNext(); k--) {
final BinaryInMemorySortBuffer buffer = BinaryInMemorySortBuffer.createBuffer(memoryManager,
normalizedKeyComputer, inputSerializer, serializer, comparator, sortSegments,
eachBufferAdditionalLimitNumPages, perRequestBuffersNum);
// add to empty queue
CircularElement element = new CircularElement(i, buffer, sortSegments);
// exception handling
ExceptionHandler<IOException> exceptionHandler = exception -> {
// forward exception
if (!closed) {
// init adding currWriteBuffer
this.exceptionHandler = exceptionHandler;
this.circularQueues = circularQueues;
bytesUntilSpilling = ((long) (startSpillingFraction * sortMemory));
// check if we should directly spill
if (bytesUntilSpilling < 1) {
bytesUntilSpilling = 0;
// add the spilling marker
this.channelManager = new SpillChannelManager();
this.merger = new BinaryExternalMerger(
ioManager, memoryManager.getPageSize(),
maxNumFileHandles, channelManager,
(BinaryRowSerializer) serializer.duplicate(), comparator,
compressionEnable, compressionCodecFactory, compressionBlockSize);
// start the thread that sorts the buffers
this.sortThread = getSortingThread(exceptionHandler, circularQueues);
// start the thread that handles spilling to secondary storage
this.spillThread = getSpillingThread(
exceptionHandler, circularQueues, ioManager,
(BinaryRowSerializer) serializer.duplicate(), comparator);
// start the thread that handles merging from second storage
this.mergeThread = getMergingThread(
exceptionHandler, circularQueues, ioManager, maxNumFileHandles, merger);
// propagate the context class loader to the spawned threads
ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
if (contextLoader != null) {
if (this.sortThread != null) {
if (this.spillThread != null) {
if (this.mergeThread != null) {
// ------------------------------------------------------------------------
// Factory Methods
// ------------------------------------------------------------------------
* Starts all the threads that are used by this sorter.
public void startThreads() {
if (this.sortThread != null) {
if (this.spillThread != null) {
if (this.mergeThread != null) {
* Shuts down all the threads initiated by this sorter. Also releases all previously allocated
* memory, if it has not yet been released by the threads, and closes and deletes all channels
* (removing the temporary files).
* <p>The threads are set to exit directly, but depending on their operation, it may take a
* while to actually happen. The sorting thread will for example not finish before the current
* batch is sorted. This method attempts to wait for the working thread to exit. If it is
* however interrupted, the method exits immediately and is not guaranteed how long the threads
* continue to exist and occupy resources afterwards.
public void close() {
// check if the sorter has been closed before
synchronized (this) {
if (this.closed) {
// mark as closed
this.closed = true;
// from here on, the code is in a try block, because even through errors might be thrown in this block,
// we need to make sure that all the memory is released.
try {
// if the result iterator has not been obtained yet, set the exception
synchronized (this.iteratorLock) {
if (this.iteratorException == null) {
this.iteratorException = new IOException("The sorter has been closed.");
// stop all the threads
if (this.sortThread != null) {
try {
} catch (Throwable t) {
LOG.error("Error shutting down sorter thread: " + t.getMessage(), t);
if (this.spillThread != null) {
try {
} catch (Throwable t) {
LOG.error("Error shutting down spilling thread: " + t.getMessage(), t);
if (this.mergeThread != null) {
try {
} catch (Throwable t) {
LOG.error("Error shutting down merging thread: " + t.getMessage(), t);
try {
if (this.sortThread != null) {
this.sortThread = null;
if (this.spillThread != null) {
this.spillThread = null;
if (this.mergeThread != null) {
this.mergeThread = null;
} catch (InterruptedException iex) {
LOG.debug("Closing of sort/merger was interrupted. " +
"The reading/sorting/spilling/merging threads may still be working.", iex);
} finally {
// Eliminate object references for MemorySegments.
circularQueues = null;
currWriteBuffer = null;
iterator = null;
private void releaseSortMemory() {
// RELEASE ALL MEMORY. If the threads and channels are still running, this should cause
// exceptions, because their memory segments are freed
try {
// floating segments are released in `dispose()` method
} catch (Throwable ignored) {"error.", ignored);
private void releaseCoreSegments() {
// NOTE: This method can only be called after disposing some buffers
List<MemorySegment> coreSegments = new ArrayList<>();
for (List<MemorySegment> segs : sortReadMemory) {
try {
if (!coreSegments.isEmpty()) {
} catch (Throwable ignored) {"error.", ignored);
protected ThreadBase getSortingThread(ExceptionHandler<IOException> exceptionHandler,
CircularQueues queues) {
return new SortingThread(exceptionHandler, queues);
protected SpillingThread getSpillingThread(ExceptionHandler<IOException> exceptionHandler,
CircularQueues queues, IOManager ioManager,
BinaryRowSerializer serializer, RecordComparator comparator) {
return new SpillingThread(exceptionHandler, queues, ioManager, serializer, comparator);
protected MergingThread getMergingThread(
ExceptionHandler<IOException> exceptionHandler,
CircularQueues queues, IOManager ioManager,
int maxNumFileHandles, BinaryExternalMerger merger) {
return new MergingThread(exceptionHandler, queues, ioManager, maxNumFileHandles, merger);
public void write(BaseRow current) throws IOException {
checkArgument(!writingDone, "Adding already done!");
try {
while (true) {
if (closed) {
throw new IOException("Already closed!", iteratorException);
synchronized (writeLock) {
// grab the next buffer
if (currWriteBuffer == null) {
try {
currWriteBuffer = this.circularQueues.empty.poll(1, TimeUnit.SECONDS);
if (currWriteBuffer == null) {
// maybe something happened, release lock.
if (!currWriteBuffer.buffer.isEmpty()) {
throw new IOException("New buffer is not empty.");
} catch (InterruptedException iex) {
throw new IOException(iex);
final BinaryInMemorySortBuffer buffer = currWriteBuffer.buffer;
if (LOG.isDebugEnabled()) {
LOG.debug("Retrieved empty read buffer " + + ".");
long occupancy = buffer.getOccupancy();
if (!buffer.write(current)) {
if (buffer.isEmpty()) {
// did not fit in a fresh buffer, must be large...
throw new IOException("The record exceeds the maximum size of a sort buffer (current maximum: "
+ buffer.getCapacity() + " bytes).");
} else {
// buffer is full, send the buffer
if (LOG.isDebugEnabled()) {
LOG.debug("Emitting full buffer: " + + ".");
// Deadlocks may occur when there are fewer MemorySegments, because of
// the fragmentation of buffer.getOccupancy ().
if (bytesUntilSpilling > 0 && circularQueues.empty.size() == 0) {
bytesUntilSpilling = 0;
currWriteBuffer = null;
// continue to process current record.
} else {
// successfully added record
// it may be that the last currWriteBuffer would have crossed the
// spilling threshold, so check it
if (bytesUntilSpilling > 0) {
bytesUntilSpilling -= buffer.getOccupancy() - occupancy;
if (bytesUntilSpilling <= 0) {
bytesUntilSpilling = 0;
} catch (Throwable e) {
IOException ioe = new IOException(e);
if (this.exceptionHandler != null) {
throw ioe;
public void write(MutableObjectIterator<BinaryRow> iterator) throws IOException {
BinaryRow row = serializer.createInstance();
while ((row = != null) {
public List<SortedDataFile<BinaryRow>> getRemainingSortedDataFiles() throws InterruptedException {
return null;
public MutableObjectIterator<BinaryRow> getIterator() throws InterruptedException {
if (!writingDone) {
writingDone = true;
if (currWriteBuffer != null) {
// add the sentinel to notify the receivers that the work is done
// send the EOF marker
LOG.debug("Sending done.");
synchronized (this.iteratorLock) {
// wait while both the iterator and the exception are not set
while (this.iterator == null && this.iteratorException == null) {
if (this.iteratorException != null) {
throw new RuntimeException("Error obtaining the sorted input: " + this.iteratorException.getMessage(),
} else {
return this.iterator;
// ------------------------------------------------------------------------
// Inter-Thread Communication
// ------------------------------------------------------------------------
* Sets the result iterator. By setting the result iterator, all threads that are waiting for
* the result
* iterator are notified and will obtain it.
* @param iterator The result iterator to set.
protected final void setResultIterator(MutableObjectIterator<BinaryRow> iterator) {
synchronized (this.iteratorLock) {
// set the result iterator only, if no exception has occurred
if (this.iteratorException == null) {
this.iterator = iterator;
* Reports an exception to all threads that are waiting for the result iterator.
* @param ioex The exception to be reported to the threads that wait for the result iterator.
protected final void setResultIteratorException(IOException ioex) {
synchronized (this.iteratorLock) {
if (this.iteratorException == null) {
this.iteratorException = ioex;
* Class representing buffers that circulate between the reading, sorting and spilling thread.
protected static final class CircularElement {
final int id; // just for debug.
final BinaryInMemorySortBuffer buffer;
final List<MemorySegment> memory; // for release memory
public CircularElement() { = -1;
this.buffer = null;
this.memory = null;
public CircularElement(int id, BinaryInMemorySortBuffer buffer, List<MemorySegment> memory) { = id;
this.buffer = buffer;
this.memory = memory;
* Collection of circularQueues that are used for the communication between the threads.
protected static final class CircularQueues {
final BlockingQueue<CircularElement> empty;
final BlockingQueue<CircularElement> sort;
final BlockingQueue<CircularElement> spill;
final BlockingQueue<ChannelWithMeta> merge;
protected CircularQueues() {
this.empty = new LinkedBlockingQueue<>();
this.sort = new LinkedBlockingQueue<>();
this.spill = new LinkedBlockingQueue<>();
this.merge = new LinkedBlockingQueue<>();
// ------------------------------------------------------------------------
// Threads
// ------------------------------------------------------------------------
* Base class for all working threads in this sorter. The specific threads for sorting,
* spilling, etc... extend this subclass.
* <p>The threads are designed to terminate themselves when the task they are set up to do is
* completed. Further more, they terminate immediately when the <code>shutdown()</code> method
* is called.
protected abstract static class ThreadBase extends Thread implements Thread.UncaughtExceptionHandler {
* The queue of empty buffer that can be used for reading.
protected final CircularQueues queues;
* The exception handler for any problems.
private final ExceptionHandler<IOException> exceptionHandler;
* The flag marking this thread as alive.
private volatile boolean alive;
* Creates a new thread.
* @param exceptionHandler The exception handler to call for all exceptions.
* @param name The name of the thread.
* @param queues The circularQueues used to pass buffers between the threads.
protected ThreadBase(ExceptionHandler<IOException> exceptionHandler, String name,
CircularQueues queues) {
// thread setup
// exception handling
this.exceptionHandler = exceptionHandler;
this.queues = queues;
this.alive = true;
* Implements exception handling and delegates to go().
public void run() {
try {
} catch (Throwable t) {
internalHandleException(new IOException("Thread '" + getName() + "' terminated due to an exception: "
+ t.getMessage(), t));
* Equivalent to the run() method.
* @throws IOException Exceptions that prohibit correct completion of the work may be thrown
* by the thread.
protected abstract void go() throws IOException;
* Checks whether this thread is still alive.
* @return true, if the thread is alive, false otherwise.
public boolean isRunning() {
return this.alive;
* Forces an immediate shutdown of the thread. Looses any state and all buffers that the
* thread is currently
* working on. This terminates cleanly for the JVM, but looses intermediate results.
public void shutdown() {
this.alive = false;
* Internally handles an exception and makes sure that this method returns without a
* problem.
* @param ioex The exception to handle.
protected final void internalHandleException(IOException ioex) {
if (!isRunning()) {
// discard any exception that occurs when after the thread is killed.
if (this.exceptionHandler != null) {
try {
} catch (Throwable ignored) {
public void uncaughtException(Thread t, Throwable e) {
internalHandleException(new IOException("Thread '" + t.getName()
+ "' terminated due to an uncaught exception: " + e.getMessage(), e));
* The thread that sorts filled buffers.
protected static class SortingThread extends ThreadBase {
private final IndexedSorter sorter;
* Creates a new sorting thread.
* @param exceptionHandler The exception handler to call for all exceptions.
* @param queues The circularQueues used to pass buffers between the threads.
public SortingThread(ExceptionHandler<IOException> exceptionHandler,
CircularQueues queues) {
super(exceptionHandler, "SortMerger sorting thread", queues);
// members
this.sorter = new QuickSort();
* Entry point of the thread.
public void go() throws IOException {
boolean alive = true;
// loop as long as the thread is marked alive
while (isRunning() && alive) {
CircularElement element;
try {
element = this.queues.sort.take();
} catch (InterruptedException iex) {
if (isRunning()) {
if (LOG.isErrorEnabled()) {
"Sorting thread was interrupted (without being shut down) while grabbing a buffer. " +
"Retrying to grab buffer...");
} else {
if (element != EOF_MARKER && element != SPILLING_MARKER) {
if (element.buffer.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sorting buffer " + + ".");
if (LOG.isDebugEnabled()) {
LOG.debug("Sorted buffer " + + ".");
} else if (element == EOF_MARKER) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sorting thread done.");
alive = false;
* The thread that handles the spilling of intermediate results.
protected class SpillingThread extends ThreadBase {
protected final IOManager ioManager; // I/O manager to create channels
protected final BinaryRowSerializer serializer; // The serializer for the data type
protected final RecordComparator comparator;
* Creates the spilling thread.
* @param exceptionHandler The exception handler to call for all exceptions.
* @param queues The circularQueues used to pass buffers between the threads.
* @param ioManager The I/O manager used to instantiate readers and writers from.
* @param serializer
* @param comparator
public SpillingThread(ExceptionHandler<IOException> exceptionHandler,
CircularQueues queues, IOManager ioManager,
BinaryRowSerializer serializer, RecordComparator comparator) {
super(exceptionHandler, "SortMerger spilling thread", queues);
this.ioManager = ioManager;
this.serializer = serializer;
this.comparator = comparator;
* Entry point of the thread.
public void go() throws IOException {
final Queue<CircularElement> cache = new ArrayDeque<>();
CircularElement element;
boolean cacheOnly = false;
// ------------------- In-Memory Cache ------------------------
// fill cache
while (isRunning()) {
// take next currWriteBuffer from queue
try {
element = this.queues.spill.take();
} catch (InterruptedException iex) {
throw new IOException("The spilling thread was interrupted.");
if (element == SPILLING_MARKER) {
} else if (element == EOF_MARKER) {
cacheOnly = true;
// check whether the thread was canceled
if (!isRunning()) {
// ------------------- In-Memory Merge ------------------------
if (cacheOnly) {
List<MutableObjectIterator<BinaryRow>> iterators = new ArrayList<>(cache.size());
for (CircularElement cached : cache) {
// set lazy iterator
List<BinaryRow> reusableEntries = new ArrayList<>();
for (int i = 0; i < iterators.size(); i++) {
setResultIterator(iterators.isEmpty() ? EmptyMutableObjectIterator.get() :
iterators.size() == 1 ? iterators.get(0) : new BinaryMergeIterator<>(
iterators, reusableEntries, comparator::compare));
// signal merging thread to exit (because there is nothing to merge externally)
// ------------------- Spilling Phase ------------------------
final FileIOChannel.Enumerator enumerator =
// loop as long as the thread is marked alive and we do not see the final currWriteBuffer
while (isRunning()) {
try {
// TODO let cache in memory instead of disk.
element = cache.isEmpty() ? queues.spill.take() : cache.poll();
} catch (InterruptedException iex) {
if (isRunning()) {
LOG.error("Spilling thread was interrupted (without being shut down) while grabbing a buffer. " +
"Retrying to grab buffer...");
} else {
// check if we are still running
if (!isRunning()) {
// check if this is the end-of-work buffer
if (element == EOF_MARKER) {
if (element.buffer.getOccupancy() > 0) {
// open next channel
FileIOChannel.ID channel =;
AbstractChannelWriterOutputView output = null;
int bytesInLastBuffer;
int blockCount;
try {
output = FileChannelUtil.createOutputView(ioManager, channel, compressionEnable,
compressionCodecFactory, compressionBlockSize, memorySegmentSize);
spillInBytes += output.getNumBytes();
spillInCompressedBytes += output.getNumCompressedBytes();
bytesInLastBuffer = output.close();
blockCount = output.getBlockCount();"here spill the {}th sort buffer data with {} bytes and {} compressed bytes",
numSpillFiles, spillInBytes, spillInCompressedBytes);
} catch (IOException e) {
if (output != null) {
throw e;
// pass spill file meta to merging thread
this.queues.merge.add(new ChannelWithMeta(channel, blockCount, bytesInLastBuffer));
// pass empty sort-buffer to reading thread
// clear the sort buffers, as both sorting and spilling threads are done.
// signal merging thread to begin the final merge
// Spilling thread done.
protected final void releaseEmptyBuffers() {
while (!this.queues.empty.isEmpty()) {
try {
CircularElement element = this.queues.empty.take();
} catch (InterruptedException iex) {
if (isRunning()) {
LOG.error("Spilling thread was interrupted (without being shut down) while collecting empty " +
"buffers to release them. Retrying to collect buffers...");
} else {
* The thread that merges the intermediate spill files and the merged files
* until sufficiently few channels remain to perform the final streamed merge.
protected class MergingThread extends ThreadBase {
protected final IOManager ioManager; // I/O manager to create channels
protected final int maxFanIn;
private final BinaryExternalMerger merger;
public MergingThread(
ExceptionHandler<IOException> exceptionHandler,
CircularQueues queues, IOManager ioManager,
int maxNumFileHandles, BinaryExternalMerger merger) {
super(exceptionHandler, "SortMerger merging thread", queues);
this.ioManager = ioManager;
this.maxFanIn = maxNumFileHandles;
this.merger = merger;
protected void go() throws IOException {
final List<ChannelWithMeta> spillChannelIDs = new ArrayList<>();
List<ChannelWithMeta> finalMergeChannelIDs = new ArrayList<>();
ChannelWithMeta channelID;
while (isRunning()) {
try {
channelID = this.queues.merge.take();
} catch (InterruptedException iex) {
if (isRunning()) {
LOG.error("Merging thread was interrupted (without being shut down) " +
"while grabbing a channel with meta. Retrying...");
} else {
if (!isRunning()) {
if (channelID == FINAL_MERGE_MARKER) {
// sort file channels by block numbers, to ensure a better merging performance
// if async merge is disabled, we will only do the final merge
// otherwise we wait for `maxFanIn` number of channels to begin a merge
if (!asyncMergeEnable || spillChannelIDs.size() < maxFanIn) {
// perform a intermediate merge
// check if we have spilled some data at all
if (finalMergeChannelIDs.isEmpty()) {
if (iterator == null) {
// only set the iterator if it's not set
// by the in memory merge stage of spilling thread.
} else {
// merge channels until sufficient file handles are available
while (isRunning() && finalMergeChannelIDs.size() > this.maxFanIn) {
finalMergeChannelIDs = merger.mergeChannelList(finalMergeChannelIDs);
// Beginning final merge.
// no need to call `getReadMemoryFromHeap` again,
// because `finalMergeChannelIDs` must become smaller
List<FileIOChannel> openChannels = new ArrayList<>();
BinaryMergeIterator<BinaryRow> iterator = merger.getMergingIterator(
finalMergeChannelIDs, openChannels);
// Merging thread done.
public long getUsedMemoryInBytes() {
long usedSizeInBytes = 0;
for (BinaryInMemorySortBuffer sortBuffer : sortBuffers) {
usedSizeInBytes += sortBuffer.getOccupancy();
return usedSizeInBytes;
public long getNumSpillFiles() {
return numSpillFiles;
public long getSpillInBytes() {
return spillInBytes;