| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.tez.runtime.library.common.sort.impl; |
| |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.nio.BufferOverflowException; |
| import java.nio.ByteBuffer; |
| import java.nio.ByteOrder; |
| import java.nio.IntBuffer; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.PriorityQueue; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.zip.Deflater; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.apache.tez.common.Preconditions; |
| import com.google.common.collect.Lists; |
| |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.runtime.library.api.IOInterruptedException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.DataInputBuffer; |
| import org.apache.tez.common.TezUtilsInternal; |
| import org.apache.tez.common.CallableWithNdc; |
| import org.apache.tez.common.io.NonSyncDataOutputStream; |
| import org.apache.tez.runtime.api.Event; |
| import org.apache.tez.runtime.library.common.comparator.ProxyComparator; |
| import org.apache.hadoop.io.RawComparator; |
| import org.apache.hadoop.util.IndexedSortable; |
| import org.apache.hadoop.util.IndexedSorter; |
| import org.apache.hadoop.util.Progress; |
| import org.apache.tez.common.TezCommonUtils; |
| import org.apache.tez.runtime.api.OutputContext; |
| import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; |
| import org.apache.tez.runtime.library.common.ConfigUtils; |
| import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; |
| import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer; |
| import org.apache.tez.runtime.library.common.sort.impl.TezMerger.DiskSegment; |
| import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment; |
| import org.apache.tez.runtime.library.utils.LocalProgress; |
| import org.apache.tez.util.StopWatch; |
| |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| import static org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.ensureSpillFilePermissions; |
| |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| public class PipelinedSorter extends ExternalSorter { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(PipelinedSorter.class); |
| |
| /** |
| * The size of each record in the index file for the map-outputs. |
| */ |
| public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24; |
| |
| private final static int APPROX_HEADER_LENGTH = 150; |
| |
| private final int partitionBits; |
| |
| private static final int PARTITION = 0; // partition offset in acct |
| private static final int KEYSTART = 1; // key offset in acct |
| private static final int VALSTART = 2; // val offset in acct |
| private static final int VALLEN = 3; // val len in acct |
| private static final int NMETA = 4; // num meta ints |
| private static final int METASIZE = NMETA * 4; // size in bytes |
| |
| private final int minSpillsForCombine; |
| private final ProxyComparator hasher; |
| // SortSpans |
| private SortSpan span; |
| |
| //total memory capacity allocated to sorter |
| private final long capacity; |
| |
| //track buffer overflow recursively in all buffers |
| private int bufferOverflowRecursion; |
| |
| // Merger |
| private final SpanMerger merger; |
| private final ExecutorService sortmaster; |
| |
| private final ArrayList<TezSpillRecord> indexCacheList = |
| new ArrayList<TezSpillRecord>(); |
| |
| private final boolean pipelinedShuffle; |
| |
| private long currentAllocatableMemory; |
| //Maintain a list of ByteBuffers |
| @VisibleForTesting |
| final List<ByteBuffer> buffers; |
| @VisibleForTesting |
| List<Integer> bufferUsage; |
| final int maxNumberOfBlocks; |
| private int bufferIndex = -1; |
| private final int MIN_BLOCK_SIZE; |
| private final boolean lazyAllocateMem; |
| private final Deflater deflater; |
| private final String auxiliaryService; |
| |
| /** |
| * Store the events to be send in close. |
| */ |
| private final List<Event> finalEvents; |
| |
| // TODO Set additional countesr - total bytes written, spills etc. |
| |
| public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs, |
| long initialMemoryAvailable) throws IOException { |
| super(outputContext, conf, numOutputs, initialMemoryAvailable); |
| |
| lazyAllocateMem = this.conf.getBoolean(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY_DEFAULT); |
| |
| if (lazyAllocateMem) { |
| /** |
| * When lazy-allocation is enabled, framework takes care of auto |
| * allocating memory on need basis. Desirable block size is set to 256MB |
| */ |
| //256 MB - 64 bytes. See comment for the 32MB allocation. |
| MIN_BLOCK_SIZE = ((256 << 20) - 64); |
| } else { |
| int minBlockSize = conf.getInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, |
| TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB_DEFAULT); |
| Preconditions.checkArgument( |
| (minBlockSize > 0 && minBlockSize < 2047), |
| TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB |
| + "=" + minBlockSize + " should be a positive value between 0 and 2047"); |
| MIN_BLOCK_SIZE = minBlockSize << 20; |
| } |
| |
| StringBuilder initialSetupLogLine = new StringBuilder("Setting up PipelinedSorter for ") |
| .append(outputContext.getInputOutputVertexNames()).append(": "); |
| partitionBits = bitcount(partitions)+1; |
| |
| boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT); |
| |
| pipelinedShuffle = !isFinalMergeEnabled() && confPipelinedShuffle; |
| auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, |
| TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); |
| //sanity checks |
| final long sortmb = this.availableMemoryMb; |
| |
| // buffers and accounting |
| long maxMemLimit = sortmb << 20; |
| |
| initialSetupLogLine.append(", UsingHashComparator="); |
| // k/v serialization |
| if(comparator instanceof ProxyComparator) { |
| hasher = (ProxyComparator)comparator; |
| initialSetupLogLine.append(true); |
| } else { |
| hasher = null; |
| initialSetupLogLine.append(false); |
| } |
| |
| LOG.info(initialSetupLogLine.toString()); |
| |
| long totalCapacityWithoutMeta = 0; |
| long availableMem = maxMemLimit; |
| int numBlocks = 0; |
| while(availableMem > 0) { |
| long size = Math.min(availableMem, computeBlockSize(availableMem, maxMemLimit)); |
| int sizeWithoutMeta = (int) ((size) - (size % METASIZE)); |
| totalCapacityWithoutMeta += sizeWithoutMeta; |
| availableMem -= size; |
| numBlocks++; |
| } |
| currentAllocatableMemory = maxMemLimit; |
| maxNumberOfBlocks = numBlocks; |
| capacity = totalCapacityWithoutMeta; |
| |
| buffers = Lists.newArrayListWithCapacity(maxNumberOfBlocks); |
| bufferUsage = Lists.newArrayListWithCapacity(maxNumberOfBlocks); |
| allocateSpace(); //Allocate the first block |
| if (!lazyAllocateMem) { |
| LOG.info("Pre allocating rest of memory buffers upfront"); |
| while(allocateSpace() != null); |
| } |
| |
| initialSetupLogLine.append("#blocks=").append(maxNumberOfBlocks); |
| initialSetupLogLine.append(", maxMemUsage=").append(maxMemLimit); |
| initialSetupLogLine.append(", lazyAllocateMem=").append( |
| lazyAllocateMem); |
| initialSetupLogLine.append(", minBlockSize=").append(MIN_BLOCK_SIZE); |
| initialSetupLogLine.append(", initial BLOCK_SIZE=").append(buffers.get(0).capacity()); |
| initialSetupLogLine.append(", finalMergeEnabled=").append(isFinalMergeEnabled()); |
| initialSetupLogLine.append(", pipelinedShuffle=").append(pipelinedShuffle); |
| initialSetupLogLine.append(", sendEmptyPartitions=").append(sendEmptyPartitionDetails); |
| initialSetupLogLine.append(", ").append(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB).append( |
| "=").append( |
| sortmb); |
| |
| Preconditions.checkState(buffers.size() > 0, "Atleast one buffer needs to be present"); |
| LOG.info(initialSetupLogLine.toString()); |
| |
| span = new SortSpan(buffers.get(bufferIndex), 1024 * 1024, 16, this.comparator); |
| merger = new SpanMerger(); // SpanIterators are comparable |
| final int sortThreads = |
| this.conf.getInt( |
| TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS, |
| TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS_DEFAULT); |
| sortmaster = Executors.newFixedThreadPool(sortThreads, |
| new ThreadFactoryBuilder().setDaemon(true) |
| .setNameFormat("Sorter {" + TezUtilsInternal.cleanVertexName(outputContext.getTaskVertexName()) + " -> " |
| + TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()) + "} #%d") |
| .build()); |
| |
| valSerializer.open(span.out); |
| keySerializer.open(span.out); |
| minSpillsForCombine = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3); |
| deflater = TezCommonUtils.newBestCompressionDeflater(); |
| finalEvents = Lists.newLinkedList(); |
| } |
| |
| ByteBuffer allocateSpace() { |
| if (currentAllocatableMemory <= 0) { |
| //No space available. |
| return null; |
| } |
| |
| int size = computeBlockSize(currentAllocatableMemory, availableMemoryMb << 20); |
| currentAllocatableMemory -= size; |
| int sizeWithoutMeta = (size) - (size % METASIZE); |
| ByteBuffer space = ByteBuffer.allocate(sizeWithoutMeta); |
| |
| buffers.add(space); |
| bufferIndex++; |
| bufferUsage.add(0); |
| |
| Preconditions.checkState(buffers.size() <= maxNumberOfBlocks, |
| "Number of blocks " + buffers.size() |
| + " is exceeding " + maxNumberOfBlocks); |
| |
| LOG.info("Newly allocated block size=" + size |
| + ", index=" + bufferIndex |
| + ", Number of buffers=" + buffers.size() |
| + ", currentAllocatableMemory=" + currentAllocatableMemory |
| + ", currentBufferSize=" + space.capacity() |
| + ", total=" + (availableMemoryMb << 20)); |
| return space; |
| } |
| |
| |
| @VisibleForTesting |
| int computeBlockSize(long availableMem, long maxAllocatedMemory) { |
| int maxBlockSize = 0; |
| /** |
| * When lazy-allocation is enabled, framework takes care of auto allocating |
| * memory on need basis. In such cases, first buffer starts with 32 MB. |
| */ |
| if (lazyAllocateMem) { |
| if (buffers == null || buffers.isEmpty()) { |
| //32 MB - 64 bytes |
| // These buffers end up occupying 33554456 (32M + 24) bytes. |
| // On large JVMs (64G+), with G1Gc - the region size maxes out at |
| // 32M. Without the -64, this structure would end up using 2 regions. |
| return ((32 << 20) - 64); |
| } |
| } |
| |
| //Honor MIN_BLOCK_SIZE |
| maxBlockSize = Math.max(MIN_BLOCK_SIZE, maxBlockSize); |
| |
| if (availableMem < maxBlockSize) { |
| maxBlockSize = (int) availableMem; |
| } |
| |
| int maxMem = (maxAllocatedMemory > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) maxAllocatedMemory; |
| if (maxBlockSize > maxMem) { |
| maxBlockSize = maxMem; |
| } |
| |
| availableMem -= maxBlockSize; |
| if (availableMem < MIN_BLOCK_SIZE) { |
| if ((maxBlockSize + availableMem) < Integer.MAX_VALUE) { |
| //Merge remaining with last block |
| maxBlockSize += availableMem; |
| } |
| } |
| return maxBlockSize; |
| } |
| |
| private int bitcount(int n) { |
| int bit = 0; |
| while(n!=0) { |
| bit++; |
| n >>= 1; |
| } |
| return bit; |
| } |
| |
| public void sort() throws IOException { |
| SortSpan newSpan = span.next(); |
| |
| if(newSpan == null) { |
| //avoid sort/spill of empty span |
| StopWatch stopWatch = new StopWatch(); |
| stopWatch.start(); |
| // sort in the same thread, do not wait for the thread pool |
| merger.add(span.sort(sorter)); |
| boolean ret = spill(true); |
| stopWatch.stop(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(outputContext.getInputOutputVertexNames() + ": Time taken for spill " |
| + (stopWatch.now(TimeUnit.MILLISECONDS)) + " ms"); |
| } |
| if (pipelinedShuffle && ret) { |
| sendPipelinedShuffleEvents(); |
| } |
| // Use the next buffer |
| bufferIndex = (bufferIndex + 1) % buffers.size(); |
| bufferUsage.set(bufferIndex, bufferUsage.get(bufferIndex) + 1); |
| int items = 1024*1024; |
| int perItem = 16; |
| if(span.length() != 0) { |
| items = span.length(); |
| perItem = span.kvbuffer.limit()/items; |
| items = (int) ((span.capacity)/(METASIZE+perItem)); |
| if(items > 1024*1024) { |
| // our goal is to have 1M splits and sort early |
| items = 1024*1024; |
| } |
| } |
| Preconditions.checkArgument(buffers.get(bufferIndex) != null, "block should not be empty"); |
| //TODO: fix per item being passed. |
| span = new SortSpan((ByteBuffer)buffers.get(bufferIndex).clear(), (1024*1024), |
| perItem, ConfigUtils.getIntermediateOutputKeyComparator(this.conf)); |
| } else { |
| // queue up the sort |
| SortTask task = new SortTask(span, sorter); |
| LOG.debug("Submitting span={} for sort", span.toString()); |
| Future<SpanIterator> future = sortmaster.submit(task); |
| merger.add(future); |
| span = newSpan; |
| } |
| valSerializer.open(span.out); |
| keySerializer.open(span.out); |
| } |
| |
| // if pipelined shuffle is enabled, this method is called to send events for every spill |
| private void sendPipelinedShuffleEvents() throws IOException{ |
| List<Event> events = Lists.newLinkedList(); |
| String pathComponent = (outputContext.getUniqueIdentifier() + "_" + (numSpills-1)); |
| ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false, |
| outputContext, (numSpills - 1), indexCacheList.get(numSpills - 1), |
| partitions, sendEmptyPartitionDetails, pathComponent, partitionStats, |
| reportDetailedPartitionStats(), auxiliaryService, deflater); |
| outputContext.sendEvents(events); |
| LOG.info(outputContext.getInputOutputVertexNames() + |
| ": Added spill event for spill (final update=false), spillId=" + (numSpills - 1)); |
| } |
| |
| @Override |
| public void write(Object key, Object value) |
| throws IOException { |
| collect( |
| key, value, partitioner.getPartition(key, value, partitions)); |
| } |
| |
| /** |
| * Serialize the key, value to intermediate storage. |
| * When this method returns, kvindex must refer to sufficient unused |
| * storage to store one METADATA. |
| */ |
| synchronized void collect(Object key, Object value, final int partition |
| ) throws IOException { |
| if (key.getClass() != serializationContext.getKeyClass()) { |
| throw new IOException("Type mismatch in key from map: expected " |
| + serializationContext.getKeyClass().getName() + ", received " |
| + key.getClass().getName()); |
| } |
| if (value.getClass() != serializationContext.getValueClass()) { |
| throw new IOException("Type mismatch in value from map: expected " |
| + serializationContext.getValueClass().getName() + ", received " |
| + value.getClass().getName()); |
| } |
| if (partition < 0 || partition >= partitions) { |
| throw new IOException("Illegal partition for " + key + " (" + |
| partition + ")"); |
| } |
| // TBD:FIX in TEZ-2574 |
| if (span.kvmeta.remaining() < METASIZE) { |
| this.sort(); |
| if (span.length() == 0) { |
| spillSingleRecord(key, value, partition); |
| return; |
| } |
| } |
| int keystart = span.kvbuffer.position(); |
| int valstart = -1; |
| int valend = -1; |
| try { |
| keySerializer.serialize(key); |
| valstart = span.kvbuffer.position(); |
| valSerializer.serialize(value); |
| valend = span.kvbuffer.position(); |
| } catch(BufferOverflowException overflow) { |
| // restore limit |
| span.kvbuffer.position(keystart); |
| this.sort(); |
| if (span.length() == 0 || bufferOverflowRecursion > buffers.size()) { |
| // spill the current key value pair |
| spillSingleRecord(key, value, partition); |
| bufferOverflowRecursion = 0; |
| return; |
| } |
| bufferOverflowRecursion++; |
| // try again |
| this.collect(key, value, partition); |
| return; |
| } |
| |
| if (bufferOverflowRecursion > 0) { |
| bufferOverflowRecursion--; |
| } |
| |
| int prefix = 0; |
| |
| if(hasher != null) { |
| prefix = hasher.getProxy(key); |
| } |
| |
| prefix = (partition << (32 - partitionBits)) | (prefix >>> partitionBits); |
| |
| /* maintain order as in PARTITION, KEYSTART, VALSTART, VALLEN */ |
| span.kvmeta.put(prefix); |
| span.kvmeta.put(keystart); |
| span.kvmeta.put(valstart); |
| span.kvmeta.put(valend - valstart); |
| mapOutputRecordCounter.increment(1); |
| outputContext.notifyProgress(); |
| mapOutputByteCounter.increment(valend - keystart); |
| } |
| |
| private void adjustSpillCounters(long rawLength, long compLength) { |
| if (!isFinalMergeEnabled()) { |
| outputBytesWithOverheadCounter.increment(rawLength); |
| } else { |
| if (numSpills > 0) { |
| additionalSpillBytesWritten.increment(compLength); |
| // Reset the value will be set during the final merge. |
| outputBytesWithOverheadCounter.setValue(0); |
| } else { |
| // Set this up for the first write only. Subsequent ones will be handled in the final merge. |
| outputBytesWithOverheadCounter.increment(rawLength); |
| } |
| } |
| } |
| |
| // it is guaranteed that when spillSingleRecord is called, there is |
| // no merger spans queued in executor. |
| private void spillSingleRecord(final Object key, final Object value, |
| int partition) throws IOException { |
| final TezSpillRecord spillRec = new TezSpillRecord(partitions); |
| // getSpillFileForWrite with size -1 as the serialized size of KV pair is still unknown |
| final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, -1); |
| Path indexFilename = |
| mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions |
| * MAP_OUTPUT_INDEX_RECORD_LENGTH); |
| spillFilePaths.put(numSpills, filename); |
| FSDataOutputStream out = rfs.create(filename, true, 4096); |
| ensureSpillFilePermissions(filename, rfs); |
| |
| try { |
| LOG.info(outputContext.getInputOutputVertexNames() + ": Spilling to " + filename.toString() + |
| ", indexFilename=" + indexFilename); |
| for (int i = 0; i < partitions; ++i) { |
| if (isThreadInterrupted()) { |
| return; |
| } |
| Writer writer = null; |
| try { |
| long segmentStart = out.getPos(); |
| if (!sendEmptyPartitionDetails || (i == partition)) { |
| writer = new Writer(serializationContext.getKeySerialization(), |
| serializationContext.getValSerialization(), out, serializationContext.getKeyClass(), |
| serializationContext.getValueClass(), codec, spilledRecordsCounter, null, false); |
| } |
| // we need not check for combiner since its a single record |
| if (i == partition) { |
| final long recordStart = out.getPos(); |
| writer.append(key, value); |
| mapOutputRecordCounter.increment(1); |
| mapOutputByteCounter.increment(out.getPos() - recordStart); |
| } |
| long rawLength = 0; |
| long partLength = 0; |
| if (writer != null) { |
| writer.close(); |
| rawLength = writer.getRawLength(); |
| partLength = writer.getCompressedLength(); |
| } |
| adjustSpillCounters(rawLength, partLength); |
| // record offsets |
| final TezIndexRecord rec = |
| new TezIndexRecord( |
| segmentStart, rawLength, partLength); |
| spillRec.putIndex(rec, i); |
| writer = null; |
| } finally { |
| if (null != writer) { |
| writer.close(); |
| } |
| } |
| } |
| |
| spillFileIndexPaths.put(numSpills, indexFilename); |
| spillRec.writeToFile(indexFilename, conf, localFs); |
| //TODO: honor cache limits |
| indexCacheList.add(spillRec); |
| ++numSpills; |
| if (!isFinalMergeEnabled()) { |
| fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen()); |
| //No final merge. Set the number of files offered via shuffle-handler |
| numShuffleChunks.setValue(numSpills); |
| } |
| if (pipelinedShuffle) { |
| sendPipelinedShuffleEvents(); |
| } |
| } finally { |
| out.close(); |
| } |
| } |
| |
| public boolean spill(boolean ignoreEmptySpills) throws IOException { |
| FSDataOutputStream out = null; |
| try { |
| try { |
| boolean ret = merger.ready(); |
| // if merger returned false and ignore merge is true, |
| // then return directly without spilling |
| if (!ret && ignoreEmptySpills){ |
| return false; |
| } |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| LOG.info(outputContext.getInputOutputVertexNames() + ": Interrupted while waiting for mergers to complete"); |
| throw new IOInterruptedException( |
| outputContext.getInputOutputVertexNames() + ": Interrupted while waiting for mergers to complete", e); |
| } |
| |
| // create spill file |
| final long size = capacity + |
| + (partitions * APPROX_HEADER_LENGTH); |
| final TezSpillRecord spillRec = new TezSpillRecord(partitions); |
| final Path filename = |
| mapOutputFile.getSpillFileForWrite(numSpills, size); |
| spillFilePaths.put(numSpills, filename); |
| out = rfs.create(filename, true, 4096); |
| ensureSpillFilePermissions(filename, rfs); |
| LOG.info(outputContext.getInputOutputVertexNames() + ": Spilling to " + filename.toString()); |
| for (int i = 0; i < partitions; ++i) { |
| if (isThreadInterrupted()) { |
| return false; |
| } |
| outputContext.notifyProgress(); |
| TezRawKeyValueIterator kvIter = merger.filter(i); |
| //write merged output to disk |
| long segmentStart = out.getPos(); |
| Writer writer = null; |
| boolean hasNext = kvIter.hasNext(); |
| if (hasNext || !sendEmptyPartitionDetails) { |
| writer = new Writer(serializationContext.getKeySerialization(), |
| serializationContext.getValSerialization(), out, serializationContext.getKeyClass(), |
| serializationContext.getValueClass(), codec, spilledRecordsCounter, null, |
| merger.needsRLE()); |
| } |
| if (combiner == null) { |
| while (kvIter.next()) { |
| writer.append(kvIter.getKey(), kvIter.getValue()); |
| } |
| } else { |
| if (hasNext) { |
| runCombineProcessor(kvIter, writer); |
| } |
| } |
| long rawLength = 0; |
| long partLength = 0; |
| //close |
| if (writer != null) { |
| writer.close(); |
| rawLength = writer.getRawLength(); |
| partLength = writer.getCompressedLength(); |
| } |
| adjustSpillCounters(rawLength, partLength); |
| // record offsets |
| final TezIndexRecord rec = |
| new TezIndexRecord(segmentStart, rawLength, partLength); |
| spillRec.putIndex(rec, i); |
| if (!isFinalMergeEnabled() && reportPartitionStats()) { |
| partitionStats[i] += partLength; |
| } |
| } |
| |
| Path indexFilename = |
| mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions |
| * MAP_OUTPUT_INDEX_RECORD_LENGTH); |
| spillFileIndexPaths.put(numSpills, indexFilename); |
| spillRec.writeToFile(indexFilename, conf, localFs); |
| //TODO: honor cache limits |
| indexCacheList.add(spillRec); |
| ++numSpills; |
| if (!isFinalMergeEnabled()) { |
| fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen()); |
| //No final merge. Set the number of files offered via shuffle-handler |
| numShuffleChunks.setValue(numSpills); |
| } |
| return true; |
| } finally { |
| if (out != null) { |
| out.close(); |
| } |
| } |
| } |
| |
| private boolean isThreadInterrupted() throws IOException { |
| if (Thread.currentThread().isInterrupted()) { |
| if (cleanup) { |
| cleanup(); |
| } |
| sortmaster.shutdownNow(); |
| LOG.info(outputContext.getInputOutputVertexNames() |
| + ": Thread interrupted, cleaned up stale data, sorter threads shutdown=" + sortmaster.isShutdown() |
| + ", terminated=" + sortmaster.isTerminated()); |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public void flush() throws IOException { |
| final String uniqueIdentifier = outputContext.getUniqueIdentifier(); |
| |
| outputContext.notifyProgress(); |
| /** |
| * Possible that the thread got interrupted when flush was happening or when the flush was |
| * never invoked. As a part of cleanup activity in TezTaskRunner, it would invoke close() |
| * on all I/O. At that time, this is safe to cleanup |
| */ |
| if (isThreadInterrupted()) { |
| return; |
| } |
| |
| try { |
| LOG.info(outputContext.getInputOutputVertexNames() + ": Starting flush of map output"); |
| span.end(); |
| merger.add(span.sort(sorter)); |
| // force a spill in flush() |
| // case 1: we want to force because of following scenarios: |
| // we have no keys written, and flush got called |
| // we want atleast one spill(be it empty) |
| // case 2: in pipeline shuffle case, we have no way of |
| // knowing the last key being written until flush is called |
| // so for flush()->spill() we want to force spill so that |
| // we can send pipeline shuffle event with last event true. |
| spill(false); |
| sortmaster.shutdown(); |
| |
| //safe to clean up |
| buffers.clear(); |
| |
| |
| if(indexCacheList.isEmpty()) { |
| /* |
| * If we do not have this check, and if the task gets killed in the middle, it can throw |
| * NPE leading to distraction when debugging. |
| */ |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(outputContext.getInputOutputVertexNames() |
| + ": Index list is empty... returning"); |
| } |
| return; |
| } |
| |
| if (!isFinalMergeEnabled()) { |
| |
| //For pipelined shuffle, previous events are already sent. Just generate the last event alone |
| int startIndex = (pipelinedShuffle) ? (numSpills - 1) : 0; |
| int endIndex = numSpills; |
| |
| for (int i = startIndex; i < endIndex; i++) { |
| boolean isLastEvent = (i == numSpills - 1); |
| String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i); |
| ShuffleUtils.generateEventOnSpill(finalEvents, isFinalMergeEnabled(), isLastEvent, |
| outputContext, i, indexCacheList.get(i), partitions, |
| sendEmptyPartitionDetails, pathComponent, partitionStats, |
| reportDetailedPartitionStats(), auxiliaryService, deflater); |
| LOG.info(outputContext.getInputOutputVertexNames() + ": Adding spill event for spill (final update=" |
| + isLastEvent + "), spillId=" + i); |
| } |
| return; |
| } |
| |
| numAdditionalSpills.increment(numSpills - 1); |
| |
| //In case final merge is required, the following code path is executed. |
| if (numSpills == 1) { |
| // someday be able to pass this directly to shuffle |
| // without writing to disk |
| final Path filename = spillFilePaths.get(0); |
| final Path indexFilename = spillFileIndexPaths.get(0); |
| finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename); |
| finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename); |
| |
| sameVolRename(filename, finalOutputFile); |
| sameVolRename(indexFilename, finalIndexFile); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(outputContext.getInputOutputVertexNames() + ": numSpills=" + numSpills + |
| ", finalOutputFile=" + finalOutputFile + ", " |
| + "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" + |
| indexFilename); |
| } |
| TezSpillRecord spillRecord = new TezSpillRecord(finalIndexFile, localFs); |
| if (reportPartitionStats()) { |
| for (int i = 0; i < spillRecord.size(); i++) { |
| partitionStats[i] += spillRecord.getIndex(i).getPartLength(); |
| } |
| } |
| numShuffleChunks.setValue(numSpills); |
| fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen()); |
| // ??? why are events not being sent here? |
| return; |
| } |
| |
| finalOutputFile = |
| mapOutputFile.getOutputFileForWrite(0); //TODO |
| finalIndexFile = |
| mapOutputFile.getOutputIndexFileForWrite(0); //TODO |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(outputContext.getInputOutputVertexNames() + ": " + |
| "numSpills: " + numSpills + ", finalOutputFile:" + finalOutputFile + ", finalIndexFile:" |
| + finalIndexFile); |
| } |
| //The output stream for the final single output file |
| FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); |
| ensureSpillFilePermissions(finalOutputFile, rfs); |
| |
| final TezSpillRecord spillRec = new TezSpillRecord(partitions); |
| |
| for (int parts = 0; parts < partitions; parts++) { |
| boolean shouldWrite = false; |
| //create the segments to be merged |
| List<Segment> segmentList = |
| new ArrayList<Segment>(numSpills); |
| for (int i = 0; i < numSpills; i++) { |
| Path spillFilename = spillFilePaths.get(i); |
| TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); |
| if (indexRecord.hasData() || !sendEmptyPartitionDetails) { |
| shouldWrite = true; |
| DiskSegment s = |
| new DiskSegment(rfs, spillFilename, indexRecord.getStartOffset(), |
| indexRecord.getPartLength(), codec, ifileReadAhead, |
| ifileReadAheadLength, ifileBufferSize, true); |
| segmentList.add(s); |
| } |
| } |
| |
| int mergeFactor = |
| this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, |
| TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT); |
| // sort the segments only if there are intermediate merges |
| boolean sortSegments = segmentList.size() > mergeFactor; |
| //merge |
| TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs, |
| serializationContext, codec, |
| segmentList, mergeFactor, |
| new Path(uniqueIdentifier), |
| (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf), |
| progressable, sortSegments, true, |
| null, spilledRecordsCounter, additionalSpillBytesRead, |
| null, merger.needsRLE()); // Not using any Progress in TezMerger. Should just work. |
| //write merged output to disk |
| long segmentStart = finalOut.getPos(); |
| long rawLength = 0; |
| long partLength = 0; |
| if (shouldWrite) { |
| Writer writer = new Writer(serializationContext.getKeySerialization(), |
| serializationContext.getValSerialization(), finalOut, |
| serializationContext.getKeyClass(), serializationContext.getValueClass(), codec, |
| spilledRecordsCounter, null, merger.needsRLE()); |
| if (combiner == null || numSpills < minSpillsForCombine) { |
| TezMerger.writeFile(kvIter, writer, progressable, |
| TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); |
| } else { |
| runCombineProcessor(kvIter, writer); |
| } |
| |
| //close |
| writer.close(); |
| rawLength = writer.getRawLength(); |
| partLength = writer.getCompressedLength(); |
| } |
| outputBytesWithOverheadCounter.increment(rawLength); |
| |
| // record offsets |
| final TezIndexRecord rec = |
| new TezIndexRecord(segmentStart, rawLength, partLength); |
| spillRec.putIndex(rec, parts); |
| if (reportPartitionStats()) { |
| partitionStats[parts] += partLength; |
| } |
| } |
| |
| numShuffleChunks.setValue(1); //final merge has happened. |
| fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen()); |
| |
| spillRec.writeToFile(finalIndexFile, conf, localFs); |
| finalOut.close(); |
| for (int i = 0; i < numSpills; i++) { |
| Path indexFilename = spillFileIndexPaths.get(i); |
| Path spillFilename = spillFilePaths.get(i); |
| rfs.delete(indexFilename, true); |
| rfs.delete(spillFilename, true); |
| } |
| |
| spillFileIndexPaths.clear(); |
| spillFilePaths.clear(); |
| } catch(InterruptedException ie) { |
| if (cleanup) { |
| cleanup(); |
| } |
| Thread.currentThread().interrupt(); |
| throw new IOInterruptedException("Interrupted while closing Output", ie); |
| } |
| } |
| |
| /** |
| * Close and send events. |
| * @return events to be returned by the edge. |
| * @throws IOException parent can throw this. |
| */ |
| public final List<Event> close() throws IOException { |
| super.close(); |
| return finalEvents; |
| } |
| |
| |
| private interface PartitionedRawKeyValueIterator extends TezRawKeyValueIterator { |
| int getPartition(); |
| Integer peekPartition(); |
| } |
| |
| private static class BufferStreamWrapper extends OutputStream |
| { |
| private final ByteBuffer out; |
| public BufferStreamWrapper(ByteBuffer out) { |
| this.out = out; |
| } |
| |
| @Override |
| public void write(int b) throws IOException { out.put((byte)b); } |
| @Override |
| public void write(byte[] b) throws IOException { out.put(b); } |
| @Override |
| public void write(byte[] b, int off, int len) throws IOException { out.put(b, off, len); } |
| } |
| |
| private static final class InputByteBuffer extends DataInputBuffer { |
| private byte[] buffer = new byte[256]; |
| private ByteBuffer wrapped = ByteBuffer.wrap(buffer); |
| private void resize(int length) { |
| if(length > buffer.length || (buffer.length > 10 * (1+length))) { |
| // scale down as well as scale up across values |
| buffer = new byte[length]; |
| wrapped = ByteBuffer.wrap(buffer); |
| } |
| wrapped.limit(length); |
| } |
| |
| // shallow copy |
| public void reset(DataInputBuffer clone) { |
| byte[] data = clone.getData(); |
| int start = clone.getPosition(); |
| int length = clone.getLength() - start; |
| super.reset(data, start, length); |
| } |
| |
| // deep copy |
| @SuppressWarnings("unused") |
| public void copy(DataInputBuffer clone) { |
| byte[] data = clone.getData(); |
| int start = clone.getPosition(); |
| int length = clone.getLength() - start; |
| resize(length); |
| System.arraycopy(data, start, buffer, 0, length); |
| super.reset(buffer, 0, length); |
| } |
| } |
| |
| private final class SortSpan implements IndexedSortable { |
| final IntBuffer kvmeta; |
| final byte[] rawkvmeta; |
| final int kvmetabase; |
| final ByteBuffer kvbuffer; |
| final NonSyncDataOutputStream out; |
| final RawComparator comparator; |
| final byte[] imeta = new byte[METASIZE]; |
| |
| private int index = 0; |
| private long eq = 0; |
| private boolean reinit = false; |
| private int capacity; |
| |
| |
| public SortSpan(ByteBuffer source, int maxItems, int perItem, RawComparator comparator) { |
| capacity = source.remaining(); |
| int metasize = METASIZE*maxItems; |
| int dataSize = maxItems * perItem; |
| if(capacity < (metasize+dataSize)) { |
| // try to allocate less meta space, because we have sample data |
| metasize = METASIZE*(capacity/(perItem+METASIZE)); |
| } |
| ByteBuffer reserved = source.duplicate(); |
| reserved.mark(); |
| LOG.info(outputContext.getInputOutputVertexNames() + ": " + "reserved.remaining()=" + |
| reserved.remaining() + ", reserved.metasize=" + metasize); |
| reserved.position(metasize); |
| kvbuffer = reserved.slice(); |
| reserved.flip(); |
| reserved.limit(metasize); |
| ByteBuffer kvmetabuffer = reserved.slice(); |
| rawkvmeta = kvmetabuffer.array(); |
| kvmetabase = kvmetabuffer.arrayOffset(); |
| kvmeta = kvmetabuffer |
| .order(ByteOrder.nativeOrder()) |
| .asIntBuffer(); |
| out = new NonSyncDataOutputStream( |
| new BufferStreamWrapper(kvbuffer)); |
| this.comparator = comparator; |
| } |
| |
| public SpanIterator sort(IndexedSorter sorter) { |
| long start = System.currentTimeMillis(); |
| if(length() > 1) { |
| sorter.sort(this, 0, length(), progressable); |
| } |
| LOG.info(outputContext.getInputOutputVertexNames() + ": " + "done sorting span=" + index + ", length=" + length() |
| + ", " + "time=" + (System.currentTimeMillis() - start)); |
| return new SpanIterator((SortSpan)this); |
| } |
| |
| int offsetFor(int i) { |
| return (i * NMETA); |
| } |
| |
| public void swap(final int mi, final int mj) { |
| final int kvi = offsetFor(mi); |
| final int kvj = offsetFor(mj); |
| |
| final int kvioff = kvmetabase + (kvi << 2); |
| final int kvjoff = kvmetabase + (kvj << 2); |
| System.arraycopy(rawkvmeta, kvioff, imeta, 0, METASIZE); |
| System.arraycopy(rawkvmeta, kvjoff, rawkvmeta, kvioff, METASIZE); |
| System.arraycopy(imeta, 0, rawkvmeta, kvjoff, METASIZE); |
| } |
| |
| protected int compareKeys(final int kvi, final int kvj) { |
| final int istart = kvmeta.get(kvi + KEYSTART); |
| final int jstart = kvmeta.get(kvj + KEYSTART); |
| final int ilen = kvmeta.get(kvi + VALSTART) - istart; |
| final int jlen = kvmeta.get(kvj + VALSTART) - jstart; |
| |
| if (ilen == 0 || jlen == 0) { |
| if (ilen == jlen) { |
| eq++; |
| } |
| return ilen - jlen; |
| } |
| |
| final byte[] buf = kvbuffer.array(); |
| final int off = kvbuffer.arrayOffset(); |
| |
| // sort by key |
| final int cmp = comparator.compare(buf, off + istart, ilen, buf, off + jstart, jlen); |
| if(cmp == 0) eq++; |
| return cmp; |
| } |
| |
| |
| public int compare(final int mi, final int mj) { |
| final int kvi = offsetFor(mi); |
| final int kvj = offsetFor(mj); |
| final int kvip = kvmeta.get(kvi + PARTITION); |
| final int kvjp = kvmeta.get(kvj + PARTITION); |
| // sort by partition |
| if (kvip != kvjp) { |
| return kvip - kvjp; |
| } |
| return compareKeys(kvi, kvj); |
| } |
| |
| public SortSpan next() { |
| ByteBuffer remaining = end(); |
| if(remaining != null) { |
| SortSpan newSpan = null; |
| int items = length(); |
| int perItem = kvbuffer.position()/items; |
| if (reinit) { //next mem block |
| //quite possible that the previous span had a length of 1. It is better to reinit here for new span. |
| items = 1024*1024; |
| perItem = 16; |
| } |
| final RawComparator newComparator = ConfigUtils.getIntermediateOutputKeyComparator(conf); |
| if (this.comparator == newComparator) { |
| LOG.warn("Same comparator used. comparator={}, newComparator={}," |
| + " hashCode: comparator={}, newComparator={}", |
| this.comparator, newComparator, |
| System.identityHashCode(this.comparator), |
| System.identityHashCode(newComparator)); |
| } |
| newSpan = new SortSpan(remaining, items, perItem, newComparator); |
| newSpan.index = index+1; |
| LOG.info( |
| String.format(outputContext.getInputOutputVertexNames() + ": " + "New Span%d.length = %d, perItem = %d", |
| newSpan.index, newSpan.length(), perItem) + ", counter:" + mapOutputRecordCounter.getValue()); |
| return newSpan; |
| } |
| return null; |
| } |
| |
| public int length() { |
| return kvmeta.limit()/NMETA; |
| } |
| |
| public ByteBuffer end() { |
| ByteBuffer remaining = kvbuffer.duplicate(); |
| remaining.position(kvbuffer.position()); |
| remaining = remaining.slice(); |
| kvbuffer.limit(kvbuffer.position()); |
| kvmeta.limit(kvmeta.position()); |
| int items = length(); |
| if(items == 0) { |
| return null; |
| } |
| int perItem = kvbuffer.position()/items; |
| LOG.info(outputContext.getInputOutputVertexNames() + ": " |
| + String.format("Span%d.length = %d, perItem = %d", index, length(), perItem)); |
| if(remaining.remaining() < METASIZE+perItem) { |
| //Check if we can get the next Buffer from the main buffer list |
| ByteBuffer space = allocateSpace(); |
| if (space != null) { |
| LOG.info(outputContext.getInputOutputVertexNames() + ": " |
| + "Getting memory from next block in the list, recordsWritten=" + mapOutputRecordCounter.getValue()); |
| reinit = true; |
| return space; |
| } |
| return null; |
| } |
| return remaining; |
| } |
| |
| public int compareInternal(final DataInputBuffer needle, final int needlePart, final int index) { |
| int cmp = 0; |
| final int keystart; |
| final int valstart; |
| final int partition; |
| partition = kvmeta.get(this.offsetFor(index) + PARTITION); |
| if(partition != needlePart) { |
| cmp = (partition-needlePart); |
| } else { |
| keystart = kvmeta.get(this.offsetFor(index) + KEYSTART); |
| valstart = kvmeta.get(this.offsetFor(index) + VALSTART); |
| final byte[] buf = kvbuffer.array(); |
| final int off = kvbuffer.arrayOffset(); |
| cmp = comparator.compare(buf, |
| keystart + off , (valstart - keystart), |
| needle.getData(), |
| needle.getPosition(), (needle.getLength() - needle.getPosition())); |
| } |
| return cmp; |
| } |
| |
| public long getEq() { |
| return eq; |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("Span[%d,%d]", NMETA*kvmeta.capacity(), kvbuffer.limit()); |
| } |
| } |
| |
| private static class SpanIterator implements PartitionedRawKeyValueIterator, Comparable<SpanIterator> { |
| private int kvindex = -1; |
| private final int maxindex; |
| private final IntBuffer kvmeta; |
| private final ByteBuffer kvbuffer; |
| private final SortSpan span; |
| private final InputByteBuffer key = new InputByteBuffer(); |
| private final InputByteBuffer value = new InputByteBuffer(); |
| private final Progress progress = new LocalProgress(); |
| |
| private static final int minrun = (1 << 4); |
| |
| public SpanIterator(SortSpan span) { |
| this.kvmeta = span.kvmeta; |
| this.kvbuffer = span.kvbuffer; |
| this.span = span; |
| this.maxindex = (kvmeta.limit()/NMETA) - 1; |
| } |
| |
| public DataInputBuffer getKey() { |
| final int keystart = kvmeta.get(span.offsetFor(kvindex) + KEYSTART); |
| final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART); |
| final byte[] buf = kvbuffer.array(); |
| final int off = kvbuffer.arrayOffset(); |
| key.reset(buf, off + keystart, valstart - keystart); |
| return key; |
| } |
| |
| public DataInputBuffer getValue() { |
| final int valstart = kvmeta.get(span.offsetFor(kvindex) + VALSTART); |
| final int vallen = kvmeta.get(span.offsetFor(kvindex) + VALLEN); |
| final byte[] buf = kvbuffer.array(); |
| final int off = kvbuffer.arrayOffset(); |
| value.reset(buf, off + valstart, vallen); |
| return value; |
| } |
| |
| public boolean next() { |
| // caveat: since we use this as a comparable in the merger |
| if(kvindex == maxindex) return false; |
| kvindex += 1; |
| if(kvindex % 100 == 0) { |
| progress.set(1 - ((maxindex - kvindex) / (float) maxindex)); |
| } |
| return true; |
| } |
| |
| @Override |
| public boolean hasNext() { |
| return (kvindex == maxindex); |
| } |
| |
| public void close() { |
| } |
| |
| public Progress getProgress() { |
| return progress; |
| } |
| |
| @Override |
| public boolean isSameKey() throws IOException { |
| return false; |
| } |
| |
| public int getPartition() { |
| final int partition = kvmeta.get(span.offsetFor(kvindex) + PARTITION); |
| return partition; |
| } |
| |
| public Integer peekPartition() { |
| if (!hasNext()) { |
| return null; |
| } else { |
| return kvmeta.get(span.offsetFor(kvindex + 1) + PARTITION); |
| } |
| } |
| |
| @SuppressWarnings("unused") |
| public int size() { |
| return (maxindex - kvindex); |
| } |
| |
| public int compareTo(SpanIterator other) { |
| return span.compareInternal(other.getKey(), other.getPartition(), kvindex); |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("SpanIterator<%d:%d> (span=%s)", kvindex, maxindex, span.toString()); |
| } |
| |
| /** |
| * bisect returns the next insertion point for a given raw key, skipping keys |
| * which are <= needle using a binary search instead of a linear comparison. |
| * This is massively efficient when long strings of identical keys occur. |
| * @param needle |
| * @param needlePart |
| * @return |
| */ |
| int bisect(DataInputBuffer needle, int needlePart) { |
| int start = kvindex; |
| int end = maxindex-1; |
| int mid = start; |
| int cmp = 0; |
| |
| if(end - start < minrun) { |
| return 0; |
| } |
| |
| if(span.compareInternal(needle, needlePart, start) > 0) { |
| return kvindex; |
| } |
| |
| // bail out early if we haven't got a min run |
| if(span.compareInternal(needle, needlePart, start+minrun) > 0) { |
| return 0; |
| } |
| |
| if(span.compareInternal(needle, needlePart, end) < 0) { |
| return end - kvindex; |
| } |
| |
| boolean found = false; |
| |
| // we sort 100k items, the max it can do is 20 loops, but break early |
| for(int i = 0; start < end && i < 16; i++) { |
| mid = start + (end - start)/2; |
| cmp = span.compareInternal(needle, needlePart, mid); |
| if(cmp == 0) { |
| start = mid; |
| found = true; |
| } else if(cmp < 0) { |
| start = mid; |
| found = true; |
| } |
| if(cmp > 0) { |
| end = mid; |
| } |
| } |
| |
| if(found) { |
| return start - kvindex; |
| } |
| return 0; |
| } |
| } |
| |
| private static class SortTask extends CallableWithNdc<SpanIterator> { |
| private final SortSpan sortable; |
| private final IndexedSorter sorter; |
| |
| public SortTask(SortSpan sortable, IndexedSorter sorter) { |
| this.sortable = sortable; |
| this.sorter = sorter; |
| } |
| |
| @Override |
| protected SpanIterator callInternal() { |
| return sortable.sort(sorter); |
| } |
| } |
| |
| private class PartitionFilter implements TezRawKeyValueIterator { |
| private final PartitionedRawKeyValueIterator iter; |
| private int partition; |
| private boolean dirty = false; |
| public PartitionFilter(PartitionedRawKeyValueIterator iter) { |
| this.iter = iter; |
| } |
| public DataInputBuffer getKey() throws IOException { return iter.getKey(); } |
| public DataInputBuffer getValue() throws IOException { return iter.getValue(); } |
| public void close() throws IOException { } |
| public Progress getProgress() { |
| return new Progress(); |
| } |
| |
| @Override |
| public boolean isSameKey() throws IOException { |
| return iter.isSameKey(); |
| } |
| |
| public boolean next() throws IOException { |
| if(dirty || iter.next()) { |
| int prefix = iter.getPartition(); |
| |
| if((prefix >>> (32 - partitionBits)) == partition) { |
| dirty = false; // we found what we were looking for, good |
| return true; |
| } else if(!dirty) { |
| dirty = true; // we did a lookahead and failed to find partition |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean hasNext() throws IOException { |
| if (dirty || iter.hasNext()) { |
| Integer part; |
| if (dirty) { |
| part = iter.getPartition(); |
| } else { |
| part = iter.peekPartition(); |
| } |
| |
| if (part != null) { |
| return (part >>> (32 - partitionBits)) == partition; |
| } |
| } |
| return false; |
| } |
| |
| public void reset(int partition) { |
| this.partition = partition; |
| } |
| |
| @SuppressWarnings("unused") |
| public int getPartition() { |
| return this.partition; |
| } |
| } |
| |
| private static class SpanHeap extends java.util.PriorityQueue<SpanIterator> { |
| private static final long serialVersionUID = 1L; |
| |
| public SpanHeap() { |
| super(256); |
| } |
| /** |
| * {@link PriorityQueue}.poll() by a different name |
| * @return |
| */ |
| public SpanIterator pop() { |
| return this.poll(); |
| } |
| } |
| |
| @InterfaceAudience.Private |
| @VisibleForTesting |
| public boolean needsRLE() { |
| return merger.needsRLE(); |
| } |
| |
| private final class SpanMerger implements PartitionedRawKeyValueIterator { |
| InputByteBuffer key = new InputByteBuffer(); |
| InputByteBuffer value = new InputByteBuffer(); |
| int partition; |
| |
| private ArrayList< Future<SpanIterator>> futures = new ArrayList< Future<SpanIterator>>(); |
| |
| private SpanHeap heap = new SpanHeap(); |
| private PartitionFilter partIter; |
| |
| private int gallop = 0; |
| private SpanIterator horse; |
| private long total = 0; |
| private long eq = 0; |
| |
| public SpanMerger() { |
| // SpanIterators are comparable |
| partIter = new PartitionFilter(this); |
| } |
| |
| public final void add(SpanIterator iter) { |
| if(iter.next()) { |
| heap.add(iter); |
| } |
| } |
| |
| public final void add(Future<SpanIterator> iter) { |
| this.futures.add(iter); |
| } |
| |
| public final boolean ready() throws IOException, InterruptedException { |
| int numSpanItr = futures.size(); |
| try { |
| SpanIterator iter = null; |
| while(this.futures.size() > 0) { |
| Future<SpanIterator> futureIter = this.futures.remove(0); |
| iter = futureIter.get(); |
| this.add(iter); |
| } |
| |
| StringBuilder sb = new StringBuilder(); |
| if (heap.size() == 0) { |
| return false; |
| } |
| for(SpanIterator sp: heap) { |
| sb.append(sp.toString()); |
| sb.append(","); |
| total += sp.span.length(); |
| eq += sp.span.getEq(); |
| } |
| LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Heap = " + sb.toString()); |
| return true; |
| } catch(ExecutionException e) { |
| LOG.error("Heap size={}, total={}, eq={}, partition={}, gallop={}, totalItr={}," |
| + " futures.size={}, destVertexName={}", |
| heap.size(), total, eq, partition, gallop, numSpanItr, futures.size(), |
| outputContext.getDestinationVertexName(), e); |
| throw new IOException(e); |
| } |
| } |
| |
| private SpanIterator pop() { |
| if(gallop > 0) { |
| gallop--; |
| return horse; |
| } |
| SpanIterator current = heap.pop(); |
| SpanIterator next = heap.peek(); |
| if(next != null && current != null && |
| ((Object)horse) == ((Object)current)) { |
| // TODO: a better threshold check than 1 key repeating |
| gallop = current.bisect(next.getKey(), next.getPartition())-1; |
| } |
| horse = current; |
| return current; |
| } |
| |
| public boolean needsRLE() { |
| return (eq > 0.1 * total); |
| } |
| |
| @SuppressWarnings("unused") |
| private SpanIterator peek() { |
| if (gallop > 0) { |
| return horse; |
| } |
| return heap.peek(); |
| } |
| |
| public final boolean next() { |
| SpanIterator current = pop(); |
| |
| if(current != null) { |
| partition = current.getPartition(); |
| key.reset(current.getKey()); |
| value.reset(current.getValue()); |
| if(gallop <= 0) { |
| // since all keys and values are references to the kvbuffer, no more deep copies |
| this.add(current); |
| } else { |
| // galloping, no deep copies required anyway |
| current.next(); |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean hasNext() { |
| return peek() != null; |
| } |
| |
| public Integer peekPartition() { |
| if (!hasNext()) { |
| return null; |
| } else { |
| SpanIterator peek = peek(); |
| return peek.getPartition(); |
| } |
| } |
| |
| public DataInputBuffer getKey() { return key; } |
| public DataInputBuffer getValue() { return value; } |
| public int getPartition() { return partition; } |
| |
| public void close() throws IOException { |
| } |
| |
| public Progress getProgress() { |
| // TODO |
| return new Progress(); |
| } |
| |
| @Override |
| public boolean isSameKey() throws IOException { |
| return false; |
| } |
| |
| public TezRawKeyValueIterator filter(int partition) { |
| partIter.reset(partition); |
| return partIter; |
| } |
| |
| } |
| } |