| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.runtime.operators.sort; |
| |
| import org.apache.flink.api.common.ExecutionConfig; |
| import org.apache.flink.api.common.typeinfo.TypeInformation; |
| import org.apache.flink.api.common.typeutils.TypeComparator; |
| import org.apache.flink.api.common.typeutils.TypeSerializer; |
| import org.apache.flink.api.common.typeutils.base.LongSerializer; |
| import org.apache.flink.api.java.tuple.Tuple; |
| import org.apache.flink.api.java.typeutils.TypeExtractor; |
| import org.apache.flink.api.java.typeutils.runtime.TupleComparator; |
| import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; |
| import org.apache.flink.core.memory.MemorySegment; |
| import org.apache.flink.runtime.io.disk.FileChannelInputView; |
| import org.apache.flink.runtime.io.disk.FileChannelOutputView; |
| import org.apache.flink.runtime.io.disk.InputViewIterator; |
| import org.apache.flink.runtime.io.disk.SeekableFileChannelInputView; |
| import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; |
| import org.apache.flink.runtime.io.disk.iomanager.IOManager; |
| import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable; |
| import org.apache.flink.runtime.memory.MemoryAllocationException; |
| import org.apache.flink.runtime.memory.MemoryManager; |
| import org.apache.flink.types.NullKeyFieldException; |
| import org.apache.flink.util.MutableObjectIterator; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import static org.apache.flink.util.Preconditions.checkArgument; |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| |
| public class LargeRecordHandler<T> { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(LargeRecordHandler.class); |
| |
| private static final int MIN_SEGMENTS_FOR_KEY_SPILLING = 1; |
| |
| private static final int MAX_SEGMENTS_FOR_KEY_SPILLING = 4; |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| private final TypeSerializer<T> serializer; |
| |
| private final TypeComparator<T> comparator; |
| |
| private TupleSerializer<Tuple> keySerializer; |
| |
| private TupleComparator<Tuple> keyComparator; |
| |
| private FileChannelOutputView recordsOutFile; |
| |
| private FileChannelOutputView keysOutFile; |
| |
| private Tuple keyTuple; |
| |
| private FileChannelInputView keysReader; |
| |
| private SeekableFileChannelInputView recordsReader; |
| |
| private FileIOChannel.ID recordsChannel; |
| |
| private FileIOChannel.ID keysChannel; |
| |
| private final IOManager ioManager; |
| |
| private final MemoryManager memManager; |
| |
| private final List<MemorySegment> memory; |
| |
| private Sorter<Tuple> keySorter; |
| |
| private final TaskInvokable memoryOwner; |
| |
| private long recordCounter; |
| |
| private int numKeyFields; |
| |
| private final int maxFilehandles; |
| |
| private volatile boolean closed; |
| |
| private final ExecutionConfig executionConfig; |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| public LargeRecordHandler( |
| TypeSerializer<T> serializer, |
| TypeComparator<T> comparator, |
| IOManager ioManager, |
| MemoryManager memManager, |
| List<MemorySegment> memory, |
| TaskInvokable memoryOwner, |
| int maxFilehandles, |
| ExecutionConfig executionConfig) { |
| this.serializer = checkNotNull(serializer); |
| this.comparator = checkNotNull(comparator); |
| this.ioManager = checkNotNull(ioManager); |
| this.memManager = checkNotNull(memManager); |
| this.memory = checkNotNull(memory); |
| this.memoryOwner = checkNotNull(memoryOwner); |
| this.maxFilehandles = maxFilehandles; |
| this.executionConfig = checkNotNull(executionConfig); |
| |
| checkArgument(maxFilehandles >= 2); |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| @SuppressWarnings("unchecked") |
| public long addRecord(T record) throws IOException { |
| |
| if (recordsOutFile == null) { |
| |
| if (closed) { |
| throw new IllegalStateException("The large record handler has been closed."); |
| } |
| if (recordsReader != null) { |
| throw new IllegalStateException("The handler has already switched to sorting."); |
| } |
| |
| LOG.debug("Initializing the large record spilling..."); |
| |
| // initialize the utilities |
| { |
| final TypeComparator<?>[] keyComps = comparator.getFlatComparators(); |
| numKeyFields = keyComps.length; |
| Object[] keyHolder = new Object[numKeyFields]; |
| |
| comparator.extractKeys(record, keyHolder, 0); |
| |
| TypeSerializer<?>[] keySers = new TypeSerializer<?>[numKeyFields]; |
| TypeSerializer<?>[] tupleSers = new TypeSerializer<?>[numKeyFields + 1]; |
| |
| int[] keyPos = new int[numKeyFields]; |
| |
| for (int i = 0; i < numKeyFields; i++) { |
| keyPos[i] = i; |
| keySers[i] = createSerializer(keyHolder[i], i); |
| tupleSers[i] = keySers[i]; |
| } |
| // add the long serializer for the offset |
| tupleSers[numKeyFields] = LongSerializer.INSTANCE; |
| |
| keySerializer = |
| new TupleSerializer<>( |
| (Class<Tuple>) Tuple.getTupleClass(numKeyFields + 1), tupleSers); |
| keyComparator = new TupleComparator<>(keyPos, keyComps, keySers); |
| keyTuple = keySerializer.createInstance(); |
| } |
| |
| // initialize the spilling |
| final int totalNumSegments = memory.size(); |
| final int segmentsForKeys = |
| (totalNumSegments >= 2 * MAX_SEGMENTS_FOR_KEY_SPILLING) |
| ? MAX_SEGMENTS_FOR_KEY_SPILLING |
| : Math.max( |
| MIN_SEGMENTS_FOR_KEY_SPILLING, |
| totalNumSegments - MAX_SEGMENTS_FOR_KEY_SPILLING); |
| |
| List<MemorySegment> recordsMemory = new ArrayList<MemorySegment>(); |
| List<MemorySegment> keysMemory = new ArrayList<MemorySegment>(); |
| |
| for (int i = 0; i < segmentsForKeys; i++) { |
| keysMemory.add(memory.get(i)); |
| } |
| for (int i = segmentsForKeys; i < totalNumSegments; i++) { |
| recordsMemory.add(memory.get(i)); |
| } |
| |
| recordsChannel = ioManager.createChannel(); |
| keysChannel = ioManager.createChannel(); |
| |
| recordsOutFile = |
| new FileChannelOutputView( |
| ioManager.createBlockChannelWriter(recordsChannel), |
| memManager, |
| recordsMemory, |
| memManager.getPageSize()); |
| |
| keysOutFile = |
| new FileChannelOutputView( |
| ioManager.createBlockChannelWriter(keysChannel), |
| memManager, |
| keysMemory, |
| memManager.getPageSize()); |
| } |
| |
| final long offset = recordsOutFile.getWriteOffset(); |
| if (offset < 0) { |
| throw new RuntimeException("wrong offset"); |
| } |
| |
| Object[] keyHolder = new Object[numKeyFields]; |
| |
| comparator.extractKeys(record, keyHolder, 0); |
| for (int i = 0; i < numKeyFields; i++) { |
| keyTuple.setField(keyHolder[i], i); |
| } |
| keyTuple.setField(offset, numKeyFields); |
| |
| keySerializer.serialize(keyTuple, keysOutFile); |
| serializer.serialize(record, recordsOutFile); |
| |
| recordCounter++; |
| |
| return offset; |
| } |
| |
| public MutableObjectIterator<T> finishWriteAndSortKeys(List<MemorySegment> memory) |
| throws IOException { |
| if (recordsOutFile == null || keysOutFile == null) { |
| throw new IllegalStateException("The LargeRecordHandler has not spilled any records"); |
| } |
| |
| // close the writers and |
| final int lastBlockBytesKeys; |
| final int lastBlockBytesRecords; |
| |
| recordsOutFile.close(); |
| keysOutFile.close(); |
| lastBlockBytesKeys = keysOutFile.getBytesInLatestSegment(); |
| lastBlockBytesRecords = recordsOutFile.getBytesInLatestSegment(); |
| recordsOutFile = null; |
| keysOutFile = null; |
| |
| final int pagesForReaders = |
| Math.max( |
| 3 * MIN_SEGMENTS_FOR_KEY_SPILLING, |
| Math.min(2 * MAX_SEGMENTS_FOR_KEY_SPILLING, memory.size() / 50)); |
| final int pagesForKeyReader = |
| Math.min( |
| pagesForReaders - MIN_SEGMENTS_FOR_KEY_SPILLING, |
| MAX_SEGMENTS_FOR_KEY_SPILLING); |
| final int pagesForRecordReader = pagesForReaders - pagesForKeyReader; |
| |
| // grab memory for the record reader |
| ArrayList<MemorySegment> memForRecordReader = new ArrayList<MemorySegment>(); |
| ArrayList<MemorySegment> memForKeysReader = new ArrayList<MemorySegment>(); |
| |
| for (int i = 0; i < pagesForRecordReader; i++) { |
| memForRecordReader.add(memory.remove(memory.size() - 1)); |
| } |
| for (int i = 0; i < pagesForKeyReader; i++) { |
| memForKeysReader.add(memory.remove(memory.size() - 1)); |
| } |
| |
| keysReader = |
| new FileChannelInputView( |
| ioManager.createBlockChannelReader(keysChannel), |
| memManager, |
| memForKeysReader, |
| lastBlockBytesKeys); |
| InputViewIterator<Tuple> keyIterator = |
| new InputViewIterator<Tuple>(keysReader, keySerializer); |
| |
| try { |
| keySorter = |
| ExternalSorter.newBuilder( |
| memManager, |
| memoryOwner, |
| keySerializer, |
| keyComparator, |
| executionConfig) |
| .maxNumFileHandles(maxFilehandles) |
| .sortBuffers(1) |
| .enableSpilling(ioManager, 1.0f) |
| .memory(memory) |
| .objectReuse(this.executionConfig.isObjectReuseEnabled()) |
| .largeRecords(false) |
| .build(keyIterator); |
| } catch (MemoryAllocationException e) { |
| throw new IllegalStateException( |
| "We should not try allocating memory. Instead the sorter should use the provided memory.", |
| e); |
| } |
| |
| // wait for the sorter to sort the keys |
| MutableObjectIterator<Tuple> result; |
| try { |
| result = keySorter.getIterator(); |
| } catch (InterruptedException e) { |
| throw new IOException(e); |
| } |
| |
| recordsReader = |
| new SeekableFileChannelInputView( |
| ioManager, |
| recordsChannel, |
| memManager, |
| memForRecordReader, |
| lastBlockBytesRecords); |
| |
| return new FetchingIterator<T>( |
| serializer, result, recordsReader, keySerializer, numKeyFields); |
| } |
| |
| /** |
| * Closes all structures and deletes all temporary files. Even in the presence of failures, this |
| * method will try and continue closing files and deleting temporary files. |
| * |
| * @throws IOException Thrown if an error occurred while closing/deleting the files. |
| */ |
| public void close() throws IOException { |
| |
| // we go on closing and deleting files in the presence of failures. |
| // we remember the first exception to occur and re-throw it later |
| Throwable ex = null; |
| |
| synchronized (this) { |
| if (closed) { |
| return; |
| } |
| closed = true; |
| |
| // close the writers |
| if (recordsOutFile != null) { |
| try { |
| recordsOutFile.close(); |
| recordsOutFile = null; |
| } catch (Throwable t) { |
| LOG.error("Cannot close the large records spill file.", t); |
| ex = t; |
| } |
| } |
| if (keysOutFile != null) { |
| try { |
| keysOutFile.close(); |
| keysOutFile = null; |
| } catch (Throwable t) { |
| LOG.error("Cannot close the large records key spill file.", t); |
| ex = ex == null ? t : ex; |
| } |
| } |
| |
| // close the readers |
| if (recordsReader != null) { |
| try { |
| recordsReader.close(); |
| recordsReader = null; |
| } catch (Throwable t) { |
| LOG.error("Cannot close the large records reader.", t); |
| ex = ex == null ? t : ex; |
| } |
| } |
| if (keysReader != null) { |
| try { |
| keysReader.close(); |
| keysReader = null; |
| } catch (Throwable t) { |
| LOG.error("Cannot close the large records key reader.", t); |
| ex = ex == null ? t : ex; |
| } |
| } |
| |
| // delete the spill files |
| if (recordsChannel != null) { |
| try { |
| ioManager.deleteChannel(recordsChannel); |
| recordsChannel = null; |
| } catch (Throwable t) { |
| LOG.error("Cannot delete the large records spill file.", t); |
| ex = ex == null ? t : ex; |
| } |
| } |
| if (keysChannel != null) { |
| try { |
| ioManager.deleteChannel(keysChannel); |
| keysChannel = null; |
| } catch (Throwable t) { |
| LOG.error("Cannot delete the large records key spill file.", t); |
| ex = ex == null ? t : ex; |
| } |
| } |
| |
| // close the key sorter |
| if (keySorter != null) { |
| try { |
| keySorter.close(); |
| keySorter = null; |
| } catch (Throwable t) { |
| LOG.error( |
| "Cannot properly dispose the key sorter and clean up its temporary files.", |
| t); |
| ex = ex == null ? t : ex; |
| } |
| } |
| |
| memManager.release(memory); |
| |
| recordCounter = 0; |
| } |
| |
| // re-throw the exception, if necessary |
| if (ex != null) { |
| throw new IOException( |
| "An error occurred cleaning up spill files in the large record handler.", ex); |
| } |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| public boolean hasData() { |
| return recordCounter > 0; |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| private TypeSerializer<Object> createSerializer(Object key, int pos) { |
| if (key == null) { |
| throw new NullKeyFieldException(pos); |
| } |
| try { |
| TypeInformation<Object> info = TypeExtractor.getForObject(key); |
| return info.createSerializer(executionConfig); |
| } catch (Throwable t) { |
| throw new RuntimeException("Could not create key serializer for type " + key); |
| } |
| } |
| |
| private static final class FetchingIterator<T> implements MutableObjectIterator<T> { |
| |
| private final TypeSerializer<T> serializer; |
| |
| private final MutableObjectIterator<Tuple> tupleInput; |
| |
| private final SeekableFileChannelInputView recordsInputs; |
| |
| private Tuple value; |
| |
| private final int pointerPos; |
| |
| public FetchingIterator( |
| TypeSerializer<T> serializer, |
| MutableObjectIterator<Tuple> tupleInput, |
| SeekableFileChannelInputView recordsInputs, |
| TypeSerializer<Tuple> tupleSerializer, |
| int pointerPos) { |
| this.serializer = serializer; |
| this.tupleInput = tupleInput; |
| this.recordsInputs = recordsInputs; |
| this.pointerPos = pointerPos; |
| |
| this.value = tupleSerializer.createInstance(); |
| } |
| |
| @Override |
| public T next(T reuse) throws IOException { |
| return next(); |
| } |
| |
| @Override |
| public T next() throws IOException { |
| Tuple value = tupleInput.next(this.value); |
| if (value != null) { |
| this.value = value; |
| long pointer = value.<Long>getField(pointerPos); |
| |
| recordsInputs.seek(pointer); |
| return serializer.deserialize(recordsInputs); |
| } else { |
| return null; |
| } |
| } |
| } |
| } |