blob: 0be312d48a9de356aad97437c6b3e023545d504e [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.spark.util.collection.unsafe.sort;
import javax.annotation.Nullable;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.function.Supplier;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.Logger;
import org.apache.spark.internal.LoggerFactory;
import org.apache.spark.internal.MDC;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.memory.TooLargePageException;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.UnsafeAlignedOffset;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.Utils;
* External sorter based on {@link UnsafeInMemorySorter}.
public final class UnsafeExternalSorter extends MemoryConsumer {
private static final Logger logger = LoggerFactory.getLogger(UnsafeExternalSorter.class);
private final PrefixComparator prefixComparator;
* {@link RecordComparator} may probably keep the reference to the records they compared last
* time, so we should not keep a {@link RecordComparator} instance inside
* {@link UnsafeExternalSorter}, because {@link UnsafeExternalSorter} is referenced by
* {@link TaskContext} and thus can not be garbage collected until the end of the task.
private final Supplier<RecordComparator> recordComparatorSupplier;
private final TaskMemoryManager taskMemoryManager;
private final BlockManager blockManager;
private final SerializerManager serializerManager;
private final TaskContext taskContext;
/** The buffer size to use when writing spills using DiskBlockObjectWriter */
private final int fileBufferSizeBytes;
* Force this sorter to spill when there are this many elements in memory.
private final int numElementsForSpillThreshold;
* Memory pages that hold the records being sorted. The pages in this list are freed when
* spilling, although in principle we could recycle these pages across spills (on the other hand,
* this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager
* itself).
private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<>();
private final LinkedList<UnsafeSorterSpillWriter> spillWriters = new LinkedList<>();
// These variables are reset after spilling:
@Nullable private volatile UnsafeInMemorySorter inMemSorter;
private MemoryBlock currentPage = null;
private long pageCursor = -1;
private long peakMemoryUsedBytes = 0;
private long totalSpillBytes = 0L;
private long totalSortTimeNanos = 0L;
private volatile SpillableIterator readingIterator = null;
public static UnsafeExternalSorter createWithExistingInMemorySorter(
TaskMemoryManager taskMemoryManager,
BlockManager blockManager,
SerializerManager serializerManager,
TaskContext taskContext,
Supplier<RecordComparator> recordComparatorSupplier,
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
int numElementsForSpillThreshold,
UnsafeInMemorySorter inMemorySorter,
long existingMemoryConsumption) throws IOException {
UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager,
serializerManager, taskContext, recordComparatorSupplier, prefixComparator, initialSize,
pageSizeBytes, numElementsForSpillThreshold, inMemorySorter, false /* ignored */);
sorter.spill(Long.MAX_VALUE, sorter);
sorter.totalSpillBytes += existingMemoryConsumption;
// The external sorter will be used to insert records, in-memory sorter is not needed.
sorter.inMemSorter = null;
return sorter;
public static UnsafeExternalSorter create(
TaskMemoryManager taskMemoryManager,
BlockManager blockManager,
SerializerManager serializerManager,
TaskContext taskContext,
Supplier<RecordComparator> recordComparatorSupplier,
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
int numElementsForSpillThreshold,
boolean canUseRadixSort) {
return new UnsafeExternalSorter(taskMemoryManager, blockManager, serializerManager,
taskContext, recordComparatorSupplier, prefixComparator, initialSize, pageSizeBytes,
numElementsForSpillThreshold, null, canUseRadixSort);
private UnsafeExternalSorter(
TaskMemoryManager taskMemoryManager,
BlockManager blockManager,
SerializerManager serializerManager,
TaskContext taskContext,
Supplier<RecordComparator> recordComparatorSupplier,
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
int numElementsForSpillThreshold,
@Nullable UnsafeInMemorySorter existingInMemorySorter,
boolean canUseRadixSort) {
super(taskMemoryManager, pageSizeBytes, taskMemoryManager.getTungstenMemoryMode());
this.taskMemoryManager = taskMemoryManager;
this.blockManager = blockManager;
this.serializerManager = serializerManager;
this.taskContext = taskContext;
this.recordComparatorSupplier = recordComparatorSupplier;
this.prefixComparator = prefixComparator;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024
this.fileBufferSizeBytes = 32 * 1024;
if (existingInMemorySorter == null) {
RecordComparator comparator = null;
if (recordComparatorSupplier != null) {
comparator = recordComparatorSupplier.get();
this.inMemSorter = new UnsafeInMemorySorter(
} else {
this.inMemSorter = existingInMemorySorter;
this.peakMemoryUsedBytes = getMemoryUsage();
this.numElementsForSpillThreshold = numElementsForSpillThreshold;
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
// does not fully consume the sorter's output (e.g. sort followed by limit).
taskContext.addTaskCompletionListener(context -> {
* Marks the current page as no-more-space-available, and as a result, either allocate a
* new page or spill when we see the next record.
public void closeCurrentPage() {
if (currentPage != null) {
pageCursor = currentPage.getBaseOffset() + currentPage.size();
* Sort and spill the current records in response to memory pressure.
public long spill(long size, MemoryConsumer trigger) throws IOException {
if (trigger != this) {
if (readingIterator != null) {
return readingIterator.spill();
return 0L; // this should throw exception
if (inMemSorter == null || inMemSorter.numRecords() <= 0) {
// There could still be some memory allocated when there are no records in the in-memory
// sorter. We will not spill it however, to ensure that we can always process at least one
// record before spilling. See the comments in `allocateMemoryForRecordIfNecessary` for why
// this is necessary.
return 0L;
}"Thread {} spilling sort data of {} to disk ({} {} so far)",
MDC.of(LogKeys.THREAD_ID$.MODULE$, Thread.currentThread().getId()),
MDC.of(LogKeys.MEMORY_SIZE$.MODULE$, Utils.bytesToString(getMemoryUsage())),
MDC.of(LogKeys.NUM_SPILL_WRITERS$.MODULE$, spillWriters.size()),
MDC.of(LogKeys.SPILL_TIMES$.MODULE$, spillWriters.size() > 1 ? "times" : "time"));
ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
final UnsafeSorterSpillWriter spillWriter =
new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics,
spillIterator(inMemSorter.getSortedIterator(), spillWriter);
final long spillSize = freeMemory();
// Note that this is more-or-less going to be a multiple of the page size, so wasted space in
// pages will currently be counted as memory spilled even though that space isn't actually
// written to disk. This also counts the space needed to store the sorter's pointer array.
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
// records. Otherwise, if the task is over allocated memory, then without freeing the memory
// pages, we might not be able to get memory for the pointer array.
totalSpillBytes += spillSize;
return spillSize;
* Return the total memory usage of this sorter, including the data pages and the sorter's pointer
* array.
private long getMemoryUsage() {
long totalPageSize = 0;
for (MemoryBlock page : allocatedPages) {
totalPageSize += page.size();
return ((inMemSorter == null) ? 0 : inMemSorter.getMemoryUsage()) + totalPageSize;
private void updatePeakMemoryUsed() {
long mem = getMemoryUsage();
if (mem > peakMemoryUsedBytes) {
peakMemoryUsedBytes = mem;
* Return the peak memory used so far, in bytes.
public long getPeakMemoryUsedBytes() {
return peakMemoryUsedBytes;
* @return the total amount of time spent sorting data (in-memory only).
public long getSortTimeNanos() {
UnsafeInMemorySorter sorter = inMemSorter;
if (sorter != null) {
return sorter.getSortTimeNanos();
return totalSortTimeNanos;
* Return the total number of bytes that has been spilled into disk so far.
public long getSpillSize() {
return totalSpillBytes;
public int getNumberOfAllocatedPages() {
return allocatedPages.size();
* Free this sorter's data pages.
* @return the number of bytes freed.
private long freeMemory() {
List<MemoryBlock> pagesToFree = clearAndGetAllocatedPagesToFree();
long memoryFreed = 0;
for (MemoryBlock block : pagesToFree) {
memoryFreed += block.size();
return memoryFreed;
* Clear the allocated pages and return the list of allocated pages to let
* the caller free the page. This is to prevent the deadlock by nested locks
* if the caller locks the UnsafeExternalSorter and call freePage which locks the
* TaskMemoryManager and cause nested locks.
* @return list of allocated pages to free
private List<MemoryBlock> clearAndGetAllocatedPagesToFree() {
List<MemoryBlock> pagesToFree = new LinkedList<>(allocatedPages);
currentPage = null;
pageCursor = 0;
return pagesToFree;
* Deletes any spill files created by this sorter.
private void deleteSpillFiles() {
for (UnsafeSorterSpillWriter spill : spillWriters) {
File file = spill.getFile();
if (file != null && file.exists()) {
if (!file.delete()) {
logger.error("Was unable to delete spill file {}",
MDC.of(LogKeys.PATH$.MODULE$, file.getAbsolutePath()));
* Frees this sorter's in-memory data structures and cleans up its spill files.
public void cleanupResources() {
// To avoid deadlocks, we can't call methods that lock the TaskMemoryManager
// (such as various free() methods) while synchronizing on the UnsafeExternalSorter.
// Instead, we will manipulate UnsafeExternalSorter state inside the synchronized
// lock and perform the actual free() calls outside it.
UnsafeInMemorySorter inMemSorterToFree = null;
List<MemoryBlock> pagesToFree = null;
try {
synchronized (this) {
pagesToFree = clearAndGetAllocatedPagesToFree();
if (inMemSorter != null) {
inMemSorterToFree = inMemSorter;
inMemSorter = null;
} finally {
for (MemoryBlock pageToFree : pagesToFree) {
if (inMemSorterToFree != null) {
* Checks whether there is enough space to insert an additional record in to the sort pointer
* array and grows the array if additional space is required. If the required space cannot be
* obtained, then the in-memory data will be spilled to disk.
private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
if (inMemSorter.numRecords() <= 0) {
// Spilling was triggered just before this method was called. The pointer array was freed
// during the spill, so a new pointer array needs to be allocated here.
LongArray array = allocateArray(inMemSorter.getInitialSize());
long used = inMemSorter.getMemoryUsage();
LongArray array = null;
try {
// could trigger spilling
array = allocateArray(used / 8 * 2);
} catch (TooLargePageException e) {
// The pointer array is too big to fix in a single page, spill.
} catch (SparkOutOfMemoryError e) {
if (inMemSorter.numRecords() > 0) {
logger.error("Unable to grow the pointer array");
throw e;
// The new array could not be allocated, but that is not an issue as it is longer needed,
// as all records were spilled.
if (inMemSorter.numRecords() <= 0) {
// Spilling was triggered while trying to allocate the new array.
if (array != null) {
// We succeeded in allocating the new array, but, since all records were spilled, a
// smaller array would also suffice.
// The pointer array was freed during the spill, so a new pointer array needs to be
// allocated here.
array = allocateArray(inMemSorter.getInitialSize());
* Allocates an additional page in order to insert an additional record. This will request
* additional memory from the memory manager and spill if the requested memory can not be
* obtained.
* @param required the required space in the data page, in bytes, including space for storing
* the record size.
private void acquireNewPageIfNecessary(int required) {
if (currentPage == null ||
pageCursor + required > currentPage.getBaseOffset() + currentPage.size()) {
// TODO: try to find space on previous pages
currentPage = allocatePage(required);
pageCursor = currentPage.getBaseOffset();
* Allocates more memory in order to insert an additional record. This will request additional
* memory from the memory manager and spill if the requested memory can not be obtained.
* @param required the required space in the data page, in bytes, including space for storing
* the record size.
private void allocateMemoryForRecordIfNecessary(int required) throws IOException {
// Step 1:
// Ensure that the pointer array has space for another record. This may cause a spill.
// Step 2:
// Ensure that the last page has space for another record. This may cause a spill.
// Step 3:
// The allocation in step 2 could have caused a spill, which would have freed the pointer
// array allocated in step 1. Therefore we need to check again whether we have to allocate
// a new pointer array.
// If the allocation in this step causes a spill event then it will not cause the page
// allocated in the previous step to be freed. The function `spill` only frees memory if at
// least one record has been inserted in the in-memory sorter. This will not be the case if
// we have spilled in the previous step.
// If we did not spill in the previous step then `growPointerArrayIfNecessary` will be a
// no-op that does not allocate any memory, and therefore can't cause a spill event.
// Thus there is no need to call `acquireNewPageIfNecessary` again after this step.
* Write a record to the sorter.
public void insertRecord(
Object recordBase, long recordOffset, int length, long prefix, boolean prefixIsNull)
throws IOException {
assert(inMemSorter != null);
if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {"Spilling data because number of spilledRecords crossed the threshold {}",
MDC.of(LogKeys.NUM_ELEMENTS_SPILL_THRESHOLD$.MODULE$, numElementsForSpillThreshold));
final int uaoSize = UnsafeAlignedOffset.getUaoSize();
// Need 4 or 8 bytes to store the record length.
final int required = length + uaoSize;
final Object base = currentPage.getBaseObject();
final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
UnsafeAlignedOffset.putSize(base, pageCursor, length);
pageCursor += uaoSize;
Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
pageCursor += length;
inMemSorter.insertRecord(recordAddress, prefix, prefixIsNull);
* Write a key-value record to the sorter. The key and value will be put together in-memory,
* using the following format:
* record length (4 bytes), key length (4 bytes), key data, value data
* record length = key length + value length + 4
public void insertKVRecord(Object keyBase, long keyOffset, int keyLen,
Object valueBase, long valueOffset, int valueLen, long prefix, boolean prefixIsNull)
throws IOException {
final int uaoSize = UnsafeAlignedOffset.getUaoSize();
final int required = keyLen + valueLen + (2 * uaoSize);
final Object base = currentPage.getBaseObject();
final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
UnsafeAlignedOffset.putSize(base, pageCursor, keyLen + valueLen + uaoSize);
pageCursor += uaoSize;
UnsafeAlignedOffset.putSize(base, pageCursor, keyLen);
pageCursor += uaoSize;
Platform.copyMemory(keyBase, keyOffset, base, pageCursor, keyLen);
pageCursor += keyLen;
Platform.copyMemory(valueBase, valueOffset, base, pageCursor, valueLen);
pageCursor += valueLen;
assert(inMemSorter != null);
inMemSorter.insertRecord(recordAddress, prefix, prefixIsNull);
* Merges another UnsafeExternalSorters into this one, the other one will be emptied.
public void merge(UnsafeExternalSorter other) throws IOException {
totalSpillBytes += other.totalSpillBytes;
// remove them from `spillWriters`, or the files will be deleted in `cleanupResources`.
* Returns a sorted iterator. It is the caller's responsibility to call `cleanupResources()`
* after consuming this iterator.
public UnsafeSorterIterator getSortedIterator() throws IOException {
assert(recordComparatorSupplier != null);
if (spillWriters.isEmpty()) {
assert(inMemSorter != null);
readingIterator = new SpillableIterator(inMemSorter.getSortedIterator());
return readingIterator;
} else {
final UnsafeSorterSpillMerger spillMerger = new UnsafeSorterSpillMerger(
recordComparatorSupplier.get(), prefixComparator, spillWriters.size());
for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
if (inMemSorter != null) {
readingIterator = new SpillableIterator(inMemSorter.getSortedIterator());
return spillMerger.getSortedIterator();
@VisibleForTesting boolean hasSpaceForAnotherRecord() {
return inMemSorter.hasSpaceForAnotherRecord();
private static void spillIterator(UnsafeSorterIterator inMemIterator,
UnsafeSorterSpillWriter spillWriter) throws IOException {
while (inMemIterator.hasNext()) {
final Object baseObject = inMemIterator.getBaseObject();
final long baseOffset = inMemIterator.getBaseOffset();
final int recordLength = inMemIterator.getRecordLength();
spillWriter.write(baseObject, baseOffset, recordLength, inMemIterator.getKeyPrefix());
* An UnsafeSorterIterator that support spilling.
class SpillableIterator extends UnsafeSorterIterator {
private UnsafeSorterIterator upstream;
private MemoryBlock lastPage = null;
private boolean loaded = false;
private int numRecords;
private Object currentBaseObject;
private long currentBaseOffset;
private int currentRecordLength;
private long currentKeyPrefix;
SpillableIterator(UnsafeSorterIterator inMemIterator) {
this.upstream = inMemIterator;
this.numRecords = inMemIterator.getNumRecords();
public int getNumRecords() {
return numRecords;
public long getCurrentPageNumber() {
throw new UnsupportedOperationException();
public long spill() throws IOException {
UnsafeInMemorySorter inMemSorterToFree = null;
List<MemoryBlock> pagesToFree = new LinkedList<>();
try {
synchronized (this) {
if (inMemSorter == null) {
return 0L;
long currentPageNumber = upstream.getCurrentPageNumber();
ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
if (numRecords > 0) {
// Iterate over the records that have not been returned and spill them.
final UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter(
blockManager, fileBufferSizeBytes, writeMetrics, numRecords);
spillIterator(upstream, spillWriter);
upstream = spillWriter.getReader(serializerManager);
} else {
// Nothing to spill as all records have been read already, but do not return yet, as the
// memory still has to be freed.
upstream = null;
long released = 0L;
synchronized (UnsafeExternalSorter.this) {
// release the pages except the one that is used. There can still be a caller that
// is accessing the current record. We free this page in that caller's next loadNext()
// call.
for (MemoryBlock page : allocatedPages) {
if (!loaded || page.pageNumber != currentPageNumber) {
released += page.size();
// Do not free the page, while we are locking `SpillableIterator`. The `freePage`
// method locks the `TaskMemoryManager`, and it's not a good idea to lock 2 objects
// in sequence. We may hit dead lock if another thread locks `TaskMemoryManager`
// and `SpillableIterator` in sequence, which may happen in
// `TaskMemoryManager.acquireExecutionMemory`.
} else {
lastPage = page;
if (lastPage != null) {
// Add the last page back to the list of allocated pages to make sure it gets freed in
// case loadNext() never gets called again.
// in-memory sorter will not be used after spilling
assert (inMemSorter != null);
released += inMemSorter.getMemoryUsage();
totalSortTimeNanos += inMemSorter.getSortTimeNanos();
// Do not free the sorter while we are locking `SpillableIterator`,
// as this can cause a deadlock.
inMemSorterToFree = inMemSorter;
inMemSorter = null;
totalSpillBytes += released;
return released;
} finally {
for (MemoryBlock pageToFree : pagesToFree) {
if (inMemSorterToFree != null) {
public boolean hasNext() {
return numRecords > 0;
public void loadNext() throws IOException {
assert upstream != null;
MemoryBlock pageToFree = null;
try {
synchronized (this) {
loaded = true;
// Just consumed the last record from the in-memory iterator.
if (lastPage != null) {
// Do not free the page here, while we are locking `SpillableIterator`. The `freePage`
// method locks the `TaskMemoryManager`, and it's a bad idea to lock 2 objects in
// sequence. We may hit dead lock if another thread locks `TaskMemoryManager` and
// `SpillableIterator` in sequence, which may happen in
// `TaskMemoryManager.acquireExecutionMemory`.
pageToFree = lastPage;
lastPage = null;
// Keep track of the current base object, base offset, record length, and key prefix,
// so that the current record can still be read in case a spill is triggered and we
// switch to the spill writer's iterator.
currentBaseObject = upstream.getBaseObject();
currentBaseOffset = upstream.getBaseOffset();
currentRecordLength = upstream.getRecordLength();
currentKeyPrefix = upstream.getKeyPrefix();
} finally {
if (pageToFree != null) {
public Object getBaseObject() {
return currentBaseObject;
public long getBaseOffset() {
return currentBaseOffset;
public int getRecordLength() {
return currentRecordLength;
public long getKeyPrefix() {
return currentKeyPrefix;
* Returns an iterator starts from startIndex, which will return the rows in the order as
* inserted.
* It is the caller's responsibility to call `cleanupResources()`
* after consuming this iterator.
* TODO: support forced spilling
public UnsafeSorterIterator getIterator(int startIndex) throws IOException {
if (spillWriters.isEmpty()) {
assert(inMemSorter != null);
UnsafeSorterIterator iter = inMemSorter.getSortedIterator();
moveOver(iter, startIndex);
return iter;
} else {
LinkedList<UnsafeSorterIterator> queue = new LinkedList<>();
int i = 0;
for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
if (i + spillWriter.recordsSpilled() > startIndex) {
UnsafeSorterIterator iter = spillWriter.getReader(serializerManager);
moveOver(iter, startIndex - i);
i += spillWriter.recordsSpilled();
if (inMemSorter != null && inMemSorter.numRecords() > 0) {
UnsafeSorterIterator iter = inMemSorter.getSortedIterator();
moveOver(iter, startIndex - i);
return new ChainedIterator(queue);
private void moveOver(UnsafeSorterIterator iter, int steps)
throws IOException {
if (steps > 0) {
for (int i = 0; i < steps; i++) {
if (iter.hasNext()) {
} else {
throw new ArrayIndexOutOfBoundsException("Failed to move the iterator " + steps +
" steps forward");
* Chain multiple UnsafeSorterIterator together as single one.
static class ChainedIterator extends UnsafeSorterIterator implements Closeable {
private final Queue<UnsafeSorterIterator> iterators;
private UnsafeSorterIterator current;
private int numRecords;
ChainedIterator(Queue<UnsafeSorterIterator> iterators) {
assert iterators.size() > 0;
this.numRecords = 0;
for (UnsafeSorterIterator iter: iterators) {
this.numRecords += iter.getNumRecords();
this.iterators = iterators;
this.current = iterators.remove();
public int getNumRecords() {
return numRecords;
public long getCurrentPageNumber() {
return current.getCurrentPageNumber();
public boolean hasNext() {
while (!current.hasNext() && !iterators.isEmpty()) {
current = iterators.remove();
return current.hasNext();
public void loadNext() throws IOException {
while (!current.hasNext() && !iterators.isEmpty()) {
current = iterators.remove();
public Object getBaseObject() { return current.getBaseObject(); }
public long getBaseOffset() { return current.getBaseOffset(); }
public int getRecordLength() { return current.getRecordLength(); }
public long getKeyPrefix() { return current.getKeyPrefix(); }
public void close() throws IOException {
if (iterators != null && !iterators.isEmpty()) {
for (UnsafeSorterIterator iterator : iterators) {
if (current != null) {
private void closeIfPossible(UnsafeSorterIterator iterator) {
if (iterator instanceof Closeable closeable) {