| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.tez.runtime.library.common.writers; |
| |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.nio.ByteBuffer; |
| import java.nio.ByteOrder; |
| import java.nio.IntBuffer; |
| import java.util.ArrayList; |
| import java.util.BitSet; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.SynchronousQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.zip.Deflater; |
| |
| import com.google.common.collect.Lists; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocalFileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.DataInputBuffer; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.io.compress.CompressionCodec; |
| import org.apache.tez.common.CallableWithNdc; |
| import org.apache.tez.common.GuavaShim; |
| import org.apache.tez.common.TezCommonUtils; |
| import org.apache.tez.common.TezUtilsInternal; |
| import org.apache.tez.common.counters.TaskCounter; |
| import org.apache.tez.common.counters.TezCounter; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.common.io.NonSyncDataOutputStream; |
| import org.apache.tez.runtime.api.Event; |
| import org.apache.tez.runtime.api.TaskFailureType; |
| import org.apache.tez.runtime.api.OutputContext; |
| import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; |
| import org.apache.tez.runtime.library.api.IOInterruptedException; |
| import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats; |
| import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; |
| import org.apache.tez.runtime.library.common.Constants; |
| import org.apache.tez.runtime.library.common.sort.impl.IFile; |
| import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; |
| import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer; |
| import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord; |
| import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; |
| import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; |
| import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.apache.tez.common.Preconditions; |
| import com.google.common.util.concurrent.FutureCallback; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.ListeningExecutorService; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import com.google.protobuf.ByteString; |
| |
| import static org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.ensureSpillFilePermissions; |
| |
| public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWriter { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(UnorderedPartitionedKVWriter.class); |
| |
| private static final int INT_SIZE = 4; |
| private static final int NUM_META = 3; // Number of meta fields. |
| private static final int INDEX_KEYLEN = 0; // KeyLength index |
| private static final int INDEX_VALLEN = 1; // ValLength index |
| private static final int INDEX_NEXT = 2; // Next Record Index. |
| private static final int META_SIZE = NUM_META * INT_SIZE; // Size of total meta-data |
| |
| private final static int APPROX_HEADER_LENGTH = 150; |
| |
| // Maybe setup a separate statistics class which can be shared between the |
| // buffer and the main path instead of having multiple arrays. |
| |
| private final String sourceDestNameTrimmed; |
| private final long availableMemory; |
| @VisibleForTesting |
| final WrappedBuffer[] buffers; |
| @VisibleForTesting |
| final BlockingQueue<WrappedBuffer> availableBuffers; |
| private final ByteArrayOutputStream baos; |
| private final NonSyncDataOutputStream dos; |
| @VisibleForTesting |
| WrappedBuffer currentBuffer; |
| private final FileSystem rfs; |
| |
| @VisibleForTesting |
| final List<SpillInfo> spillInfoList = Collections.synchronizedList(new ArrayList<SpillInfo>()); |
| |
| private final ListeningExecutorService spillExecutor; |
| |
| private final int[] numRecordsPerPartition; |
| private long localOutputRecordBytesCounter; |
| private long localOutputBytesWithOverheadCounter; |
| private long localOutputRecordsCounter; |
| // notify after x records |
| private static final int NOTIFY_THRESHOLD = 1000; |
| // uncompressed size for each partition |
| private final long[] sizePerPartition; |
| private volatile long spilledSize = 0; |
| private boolean dataViaEventsEnabled; |
| private int dataViaEventsMaxSize; |
| |
| static final ThreadLocal<Deflater> deflater = new ThreadLocal<Deflater>() { |
| |
| @Override |
| public Deflater initialValue() { |
| return TezCommonUtils.newBestCompressionDeflater(); |
| } |
| |
| @Override |
| public Deflater get() { |
| Deflater deflater = super.get(); |
| deflater.reset(); |
| return deflater; |
| } |
| }; |
| |
| private final Semaphore availableSlots; |
| |
| /** |
| * Represents final number of records written (spills are not counted) |
| */ |
| protected final TezCounter outputLargeRecordsCounter; |
| |
| @VisibleForTesting |
| int numBuffers; |
| @VisibleForTesting |
| int sizePerBuffer; |
| @VisibleForTesting |
| int lastBufferSize; |
| @VisibleForTesting |
| int numInitializedBuffers; |
| @VisibleForTesting |
| int spillLimit; |
| |
| private Throwable spillException; |
| private AtomicBoolean isShutdown = new AtomicBoolean(false); |
| @VisibleForTesting |
| final AtomicInteger numSpills = new AtomicInteger(0); |
| private final AtomicInteger pendingSpillCount = new AtomicInteger(0); |
| |
| @VisibleForTesting |
| Path finalIndexPath; |
| @VisibleForTesting |
| Path finalOutPath; |
| |
| //for single partition cases (e.g UnorderedKVOutput) |
| @VisibleForTesting |
| final IFile.Writer writer; |
| @VisibleForTesting |
| final boolean skipBuffers; |
| |
| private final ReentrantLock spillLock = new ReentrantLock(); |
| private final Condition spillInProgress = spillLock.newCondition(); |
| |
| private final boolean pipelinedShuffle; |
| private final boolean isFinalMergeEnabled; |
| // To store events when final merge is disabled |
| private final List<Event> finalEvents; |
| // How partition stats should be reported. |
| final ReportPartitionStats reportPartitionStats; |
| |
| private final long indexFileSizeEstimate; |
| |
| private List<WrappedBuffer> filledBuffers = new ArrayList<>(); |
| |
| // When enabled, uses in-mem ifile writer |
| private final boolean useCachedStream; |
| |
| public UnorderedPartitionedKVWriter(OutputContext outputContext, Configuration conf, |
| int numOutputs, long availableMemoryBytes) throws IOException { |
| super(outputContext, conf, numOutputs); |
| |
| Preconditions.checkArgument(availableMemoryBytes >= 0, "availableMemory should be >= 0 bytes"); |
| |
| this.sourceDestNameTrimmed = TezUtilsInternal.cleanVertexName(outputContext.getTaskVertexName()) + " -> " |
| + TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()); |
| //Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might not add much value in |
| // this case. Add it later if needed. |
| boolean pipelinedShuffleConf = this.conf.getBoolean(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT); |
| this.isFinalMergeEnabled = conf.getBoolean( |
| TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, |
| TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT); |
| this.pipelinedShuffle = pipelinedShuffleConf && !isFinalMergeEnabled; |
| this.finalEvents = Lists.newLinkedList(); |
| |
| this.dataViaEventsEnabled = conf.getBoolean( |
| TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED, |
| TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED_DEFAULT); |
| |
| // No max cap on size (intentional) |
| this.dataViaEventsMaxSize = conf.getInt( |
| TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE, |
| TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE_DEFAULT); |
| |
| boolean useCachedStreamConfig = conf.getBoolean( |
| TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE, |
| TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE_DEFAULT); |
| |
| this.useCachedStream = useCachedStreamConfig && (this.dataViaEventsEnabled && (numPartitions == 1) |
| && !pipelinedShuffle); |
| |
| if (availableMemoryBytes == 0) { |
| Preconditions.checkArgument(((numPartitions == 1) && !pipelinedShuffle), "availableMemory " |
| + "can be set to 0 only when numPartitions=1 and " + TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " is disabled. current numPartitions=" + |
| numPartitions + ", " + TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + "=" |
| + pipelinedShuffle); |
| } |
| |
| // Ideally, should be significantly larger. |
| availableMemory = availableMemoryBytes; |
| |
| // Allow unit tests to control the buffer sizes. |
| int maxSingleBufferSizeBytes = conf.getInt( |
| TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, |
| Integer.MAX_VALUE); |
| computeNumBuffersAndSize(maxSingleBufferSizeBytes); |
| |
| availableBuffers = new LinkedBlockingQueue<WrappedBuffer>(); |
| buffers = new WrappedBuffer[numBuffers]; |
| // Set up only the first buffer to start with. |
| buffers[0] = new WrappedBuffer(numOutputs, sizePerBuffer); |
| numInitializedBuffers = 1; |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(sourceDestNameTrimmed + ": " + "Initializing Buffer #" + |
| numInitializedBuffers + " with size=" + sizePerBuffer); |
| } |
| currentBuffer = buffers[0]; |
| baos = new ByteArrayOutputStream(); |
| dos = new NonSyncDataOutputStream(baos); |
| keySerializer.open(dos); |
| valSerializer.open(dos); |
| rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw(); |
| |
| int maxThreads = Math.max(2, numBuffers/2); |
| //TODO: Make use of TezSharedExecutor later |
| ExecutorService executor = new ThreadPoolExecutor(1, maxThreads, |
| 60L, TimeUnit.SECONDS, |
| new SynchronousQueue<Runnable>(), |
| new ThreadFactoryBuilder() |
| .setDaemon(true) |
| .setNameFormat( |
| "UnorderedOutSpiller {" + TezUtilsInternal.cleanVertexName( |
| outputContext.getDestinationVertexName()) + "} #%d") |
| .build() |
| ); |
| // to restrict submission of more tasks than threads (e.g numBuffers > numThreads) |
| // This is maxThreads - 1, to avoid race between callback thread releasing semaphore and the |
| // thread calling tryAcquire. |
| availableSlots = new Semaphore(maxThreads - 1, true); |
| spillExecutor = MoreExecutors.listeningDecorator(executor); |
| numRecordsPerPartition = new int[numPartitions]; |
| reportPartitionStats = ReportPartitionStats.fromString( |
| conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, |
| TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT)); |
| sizePerPartition = (reportPartitionStats.isEnabled()) ? |
| new long[numPartitions] : null; |
| |
| outputLargeRecordsCounter = outputContext.getCounters().findCounter( |
| TaskCounter.OUTPUT_LARGE_RECORDS); |
| |
| indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH; |
| |
| if (numPartitions == 1 && !pipelinedShuffle) { |
| //special case, where in only one partition is available. |
| skipBuffers = true; |
| if (this.useCachedStream) { |
| writer = new IFile.FileBackedInMemIFileWriter(keySerialization, valSerialization, rfs, |
| outputFileHandler, keyClass, valClass, codec, outputRecordsCounter, |
| outputRecordBytesCounter, dataViaEventsMaxSize); |
| } else { |
| finalOutPath = outputFileHandler.getOutputFileForWrite(); |
| writer = new IFile.Writer(keySerialization, valSerialization, rfs, finalOutPath, keyClass, valClass, |
| codec, outputRecordsCounter, outputRecordBytesCounter); |
| ensureSpillFilePermissions(finalOutPath, rfs); |
| } |
| } else { |
| skipBuffers = false; |
| writer = null; |
| } |
| LOG.info(sourceDestNameTrimmed + ": " |
| + "numBuffers=" + numBuffers |
| + ", sizePerBuffer=" + sizePerBuffer |
| + ", skipBuffers=" + skipBuffers |
| + ", numPartitions=" + numPartitions |
| + ", availableMemory=" + availableMemory |
| + ", maxSingleBufferSizeBytes=" + maxSingleBufferSizeBytes |
| + ", pipelinedShuffle=" + pipelinedShuffle |
| + ", isFinalMergeEnabled=" + isFinalMergeEnabled |
| + ", numPartitions=" + numPartitions |
| + ", reportPartitionStats=" + reportPartitionStats |
| + ", dataViaEventsEnabled=" + dataViaEventsEnabled |
| + ", dataViaEventsMaxSize=" + dataViaEventsMaxSize |
| + ", useCachedStreamConfig=" + useCachedStreamConfig |
| + ", useCachedStream=" + useCachedStream |
| ); |
| } |
| |
| private static final int ALLOC_OVERHEAD = 64; |
| private void computeNumBuffersAndSize(int bufferLimit) { |
| numBuffers = (int)(availableMemory / bufferLimit); |
| |
| if (numBuffers >= 2) { |
| sizePerBuffer = bufferLimit - ALLOC_OVERHEAD; |
| lastBufferSize = (int)(availableMemory % bufferLimit); |
| // Use leftover memory last buffer only if the leftover memory > 50% of bufferLimit |
| if (lastBufferSize > bufferLimit / 2) { |
| numBuffers += 1; |
| } else { |
| if (lastBufferSize > 0) { |
| LOG.warn("Underallocating memory. Unused memory size: {}.", lastBufferSize); |
| } |
| lastBufferSize = sizePerBuffer; |
| } |
| } else { |
| // We should have minimum of 2 buffers. |
| numBuffers = 2; |
| if (availableMemory / numBuffers > Integer.MAX_VALUE) { |
| sizePerBuffer = Integer.MAX_VALUE; |
| } else { |
| sizePerBuffer = (int)(availableMemory / numBuffers); |
| } |
| // 2 equal sized buffers. |
| lastBufferSize = sizePerBuffer; |
| } |
| // Ensure allocation size is multiple of INT_SIZE, truncate down. |
| sizePerBuffer = sizePerBuffer - (sizePerBuffer % INT_SIZE); |
| lastBufferSize = lastBufferSize - (lastBufferSize % INT_SIZE); |
| |
| int mergePercent = conf.getInt( |
| TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT, |
| TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT_DEFAULT); |
| spillLimit = numBuffers * mergePercent / 100; |
| // Keep within limits. |
| if (spillLimit < 1) { |
| spillLimit = 1; |
| } |
| if (spillLimit > numBuffers) { |
| spillLimit = numBuffers; |
| } |
| } |
| |
| @Override |
| public void write(Object key, Object value) throws IOException { |
| // Skipping checks for key-value types. IFile takes care of these, but should be removed from |
| // there as well. |
| |
| // How expensive are checks like these ? |
| if (isShutdown.get()) { |
| throw new RuntimeException("Writer already closed"); |
| } |
| if (spillException != null) { |
| // Already reported as a fatalError - report to the user code |
| throw new IOException("Exception during spill", new IOException(spillException)); |
| } |
| if (skipBuffers) { |
| //special case, where we have only one partition and pipelining is disabled. |
| // The reason outputRecordsCounter isn't updated here: |
| // For skipBuffers case, IFile writer has the reference to |
| // outputRecordsCounter and during its close method call, |
| // it will update the outputRecordsCounter. |
| writer.append(key, value); |
| outputContext.notifyProgress(); |
| } else { |
| int partition = partitioner.getPartition(key, value, numPartitions); |
| write(key, value, partition); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| private void write(Object key, Object value, int partition) throws IOException { |
| // Wrap to 4 byte (Int) boundary for metaData |
| int mod = currentBuffer.nextPosition % INT_SIZE; |
| int metaSkip = mod == 0 ? 0 : (INT_SIZE - mod); |
| if ((currentBuffer.availableSize < (META_SIZE + metaSkip)) || (currentBuffer.full)) { |
| // Move over to the next buffer. |
| metaSkip = 0; |
| setupNextBuffer(); |
| } |
| currentBuffer.nextPosition += metaSkip; |
| int metaStart = currentBuffer.nextPosition; |
| currentBuffer.availableSize -= (META_SIZE + metaSkip); |
| currentBuffer.nextPosition += META_SIZE; |
| |
| keySerializer.serialize(key); |
| |
| if (currentBuffer.full) { |
| if (metaStart == 0) { // Started writing at the start of the buffer. Write Key to disk. |
| // Key too large for any buffer. Write entire record to disk. |
| currentBuffer.reset(); |
| writeLargeRecord(key, value, partition); |
| return; |
| } else { // Exceeded length on current buffer. |
| // Try resetting the buffer to the next one, if this was not the start of a buffer, |
| // and begin spilling the current buffer to disk if it has any records. |
| setupNextBuffer(); |
| write(key, value, partition); |
| return; |
| } |
| } |
| |
| int valStart = currentBuffer.nextPosition; |
| valSerializer.serialize(value); |
| |
| if (currentBuffer.full) { |
| // Value too large for current buffer, or K-V too large for entire buffer. |
| if (metaStart == 0) { |
| // Key + Value too large for a single buffer. |
| currentBuffer.reset(); |
| writeLargeRecord(key, value, partition); |
| return; |
| } else { // Exceeded length on current buffer. |
| // Try writing key+value to a new buffer - will fall back to disk if that fails. |
| setupNextBuffer(); |
| write(key, value, partition); |
| return; |
| } |
| } |
| |
| // Meta-data updates |
| int metaIndex = metaStart / INT_SIZE; |
| int indexNext = currentBuffer.partitionPositions[partition]; |
| |
| currentBuffer.metaBuffer.put(metaIndex + INDEX_KEYLEN, (valStart - (metaStart + META_SIZE))); |
| currentBuffer.metaBuffer.put(metaIndex + INDEX_VALLEN, (currentBuffer.nextPosition - valStart)); |
| currentBuffer.metaBuffer.put(metaIndex + INDEX_NEXT, indexNext); |
| currentBuffer.skipSize += metaSkip; // For size estimation |
| // Update stats on number of records |
| localOutputRecordBytesCounter += (currentBuffer.nextPosition - (metaStart + META_SIZE)); |
| localOutputBytesWithOverheadCounter += ((currentBuffer.nextPosition - metaStart) + metaSkip); |
| localOutputRecordsCounter++; |
| if (localOutputRecordBytesCounter % NOTIFY_THRESHOLD == 0) { |
| updateTezCountersAndNotify(); |
| } |
| currentBuffer.partitionPositions[partition] = metaStart; |
| currentBuffer.recordsPerPartition[partition]++; |
| currentBuffer.sizePerPartition[partition] += |
| currentBuffer.nextPosition - (metaStart + META_SIZE); |
| currentBuffer.numRecords++; |
| |
| } |
| |
| private void updateTezCountersAndNotify() { |
| outputRecordBytesCounter.increment(localOutputRecordBytesCounter); |
| outputBytesWithOverheadCounter.increment(localOutputBytesWithOverheadCounter); |
| outputRecordsCounter.increment(localOutputRecordsCounter); |
| outputContext.notifyProgress(); |
| localOutputRecordBytesCounter = 0; |
| localOutputBytesWithOverheadCounter = 0; |
| localOutputRecordsCounter = 0; |
| } |
| |
| private void setupNextBuffer() throws IOException { |
| |
| if (currentBuffer.numRecords == 0) { |
| currentBuffer.reset(); |
| } else { |
| // Update overall stats |
| final int filledBufferCount = filledBuffers.size(); |
| if (LOG.isDebugEnabled() || (filledBufferCount % 10) == 0) { |
| LOG.info(sourceDestNameTrimmed + ": " + "Moving to next buffer. Total filled buffers: " + filledBufferCount); |
| } |
| updateGlobalStats(currentBuffer); |
| |
| filledBuffers.add(currentBuffer); |
| mayBeSpill(false); |
| |
| currentBuffer = getNextAvailableBuffer(); |
| |
| // in case spill threads are free, check if spilling is needed |
| mayBeSpill(false); |
| } |
| } |
| |
| private void mayBeSpill(boolean shouldBlock) throws IOException { |
| if (filledBuffers.size() >= spillLimit) { |
| // Do not block; possible that there are more buffers |
| scheduleSpill(shouldBlock); |
| } |
| } |
| |
| private boolean scheduleSpill(boolean block) throws IOException { |
| if (filledBuffers.isEmpty()) { |
| return false; |
| } |
| |
| try { |
| if (block) { |
| availableSlots.acquire(); |
| } else { |
| if (!availableSlots.tryAcquire()) { |
| // Data in filledBuffers would be spilled in subsequent iteration. |
| return false; |
| } |
| } |
| |
| final int filledBufferCount = filledBuffers.size(); |
| if (LOG.isDebugEnabled() || (filledBufferCount % 10) == 0) { |
| LOG.info(sourceDestNameTrimmed + ": triggering spill. filledBuffers.size=" + filledBufferCount); |
| } |
| pendingSpillCount.incrementAndGet(); |
| int spillNumber = numSpills.getAndIncrement(); |
| |
| ListenableFuture<SpillResult> future = spillExecutor.submit(new SpillCallable( |
| new ArrayList<WrappedBuffer>(filledBuffers), codec, spilledRecordsCounter, |
| spillNumber)); |
| filledBuffers.clear(); |
| Futures.addCallback(future, new SpillCallback(spillNumber), GuavaShim.directExecutor()); |
| // Update once per buffer (instead of every record) |
| updateTezCountersAndNotify(); |
| return true; |
| } catch(InterruptedException ie) { |
| Thread.currentThread().interrupt(); // reset interrupt status |
| } |
| return false; |
| } |
| |
| private boolean reportPartitionStats() { |
| return (sizePerPartition != null); |
| } |
| |
| private void updateGlobalStats(WrappedBuffer buffer) { |
| for (int i = 0; i < numPartitions; i++) { |
| numRecordsPerPartition[i] += buffer.recordsPerPartition[i]; |
| if (reportPartitionStats()) { |
| sizePerPartition[i] += buffer.sizePerPartition[i]; |
| } |
| } |
| } |
| |
| private WrappedBuffer getNextAvailableBuffer() throws IOException { |
| if (availableBuffers.peek() == null) { |
| if (numInitializedBuffers < numBuffers) { |
| buffers[numInitializedBuffers] = new WrappedBuffer(numPartitions, |
| numInitializedBuffers == numBuffers - 1 ? lastBufferSize : sizePerBuffer); |
| numInitializedBuffers++; |
| return buffers[numInitializedBuffers - 1]; |
| } else { |
| // All buffers initialized, and none available right now. Wait |
| try { |
| // Ensure that spills are triggered so that buffers can be released. |
| mayBeSpill(true); |
| return availableBuffers.take(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new IOInterruptedException("Interrupted while waiting for next buffer", e); |
| } |
| } |
| } else { |
| return availableBuffers.poll(); |
| } |
| } |
| |
| // All spills using compression for now. |
| private class SpillCallable extends CallableWithNdc<SpillResult> { |
| |
| private final List<WrappedBuffer> filledBuffers; |
| private final CompressionCodec codec; |
| private final TezCounter numRecordsCounter; |
| private int spillIndex; |
| private SpillPathDetails spillPathDetails; |
| private int spillNumber; |
| |
| public SpillCallable(List<WrappedBuffer> filledBuffers, CompressionCodec codec, |
| TezCounter numRecordsCounter, SpillPathDetails spillPathDetails) { |
| this(filledBuffers, codec, numRecordsCounter, spillPathDetails.spillIndex); |
| Preconditions.checkArgument(spillPathDetails.outputFilePath != null, "Spill output file " |
| + "path can not be null"); |
| this.spillPathDetails = spillPathDetails; |
| } |
| |
| public SpillCallable(List<WrappedBuffer> filledBuffers, CompressionCodec codec, |
| TezCounter numRecordsCounter, int spillNumber) { |
| this.filledBuffers = filledBuffers; |
| this.codec = codec; |
| this.numRecordsCounter = numRecordsCounter; |
| this.spillNumber = spillNumber; |
| } |
| |
| @Override |
| protected SpillResult callInternal() throws IOException { |
| // This should not be called with an empty buffer. Check before invoking. |
| |
| // Number of parallel spills determined by number of threads. |
| // Last spill synchronization handled separately. |
| SpillResult spillResult = null; |
| if (spillPathDetails == null) { |
| this.spillPathDetails = getSpillPathDetails(false, -1, spillNumber); |
| this.spillIndex = spillPathDetails.spillIndex; |
| } |
| LOG.info("Writing spill " + spillNumber + " to " + spillPathDetails.outputFilePath.toString()); |
| FSDataOutputStream out = rfs.create(spillPathDetails.outputFilePath); |
| ensureSpillFilePermissions(spillPathDetails.outputFilePath, rfs); |
| TezSpillRecord spillRecord = new TezSpillRecord(numPartitions); |
| DataInputBuffer key = new DataInputBuffer(); |
| DataInputBuffer val = new DataInputBuffer(); |
| long compressedLength = 0; |
| for (int i = 0; i < numPartitions; i++) { |
| IFile.Writer writer = null; |
| try { |
| long segmentStart = out.getPos(); |
| long numRecords = 0; |
| for (WrappedBuffer buffer : filledBuffers) { |
| outputContext.notifyProgress(); |
| if (buffer.partitionPositions[i] == WrappedBuffer.PARTITION_ABSENT_POSITION) { |
| // Skip empty partition. |
| continue; |
| } |
| if (writer == null) { |
| writer = new Writer(keySerialization, valSerialization, out, keyClass, valClass, codec, null, null); |
| } |
| numRecords += writePartition(buffer.partitionPositions[i], buffer, writer, key, val); |
| } |
| if (writer != null) { |
| if (numRecordsCounter != null) { |
| // TezCounter is not threadsafe; Since numRecordsCounter would be updated from |
| // multiple threads, it is good to synchronize it when incrementing it for correctness. |
| synchronized (numRecordsCounter) { |
| numRecordsCounter.increment(numRecords); |
| } |
| } |
| writer.close(); |
| compressedLength += writer.getCompressedLength(); |
| TezIndexRecord indexRecord = new TezIndexRecord(segmentStart, writer.getRawLength(), |
| writer.getCompressedLength()); |
| spillRecord.putIndex(indexRecord, i); |
| writer = null; |
| } |
| } finally { |
| if (writer != null) { |
| writer.close(); |
| } |
| } |
| } |
| key.close(); |
| val.close(); |
| |
| spillResult = new SpillResult(compressedLength, this.filledBuffers); |
| |
| handleSpillIndex(spillPathDetails, spillRecord); |
| LOG.info(sourceDestNameTrimmed + ": " + "Finished spill " + spillIndex); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(sourceDestNameTrimmed + ": " + "Spill=" + spillIndex + ", indexPath=" |
| + spillPathDetails.indexFilePath + ", outputPath=" + spillPathDetails.outputFilePath); |
| } |
| return spillResult; |
| } |
| } |
| |
| private long writePartition(int pos, WrappedBuffer wrappedBuffer, Writer writer, |
| DataInputBuffer keyBuffer, DataInputBuffer valBuffer) throws IOException { |
| long numRecords = 0; |
| while (pos != WrappedBuffer.PARTITION_ABSENT_POSITION) { |
| int metaIndex = pos / INT_SIZE; |
| int keyLength = wrappedBuffer.metaBuffer.get(metaIndex + INDEX_KEYLEN); |
| int valLength = wrappedBuffer.metaBuffer.get(metaIndex + INDEX_VALLEN); |
| keyBuffer.reset(wrappedBuffer.buffer, pos + META_SIZE, keyLength); |
| valBuffer.reset(wrappedBuffer.buffer, pos + META_SIZE + keyLength, valLength); |
| |
| writer.append(keyBuffer, valBuffer); |
| numRecords++; |
| pos = wrappedBuffer.metaBuffer.get(metaIndex + INDEX_NEXT); |
| } |
| return numRecords; |
| } |
| |
| public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) { |
| long initialMemRequestMb = conf.getInt( |
| TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, |
| TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB_DEFAULT); |
| Preconditions.checkArgument(initialMemRequestMb != 0, |
| TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB + " should be larger than 0"); |
| long reqBytes = initialMemRequestMb << 20; |
| LOG.info("Requested BufferSize (" + TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB |
| + ") : " + initialMemRequestMb); |
| return reqBytes; |
| } |
| |
| private boolean canSendDataOverDME() throws IOException { |
| if (dataViaEventsEnabled |
| && this.useCachedStream |
| && this.finalOutPath == null) { |
| |
| // It is possible that in-mem writer spilled over to disk. Need to use |
| // that path as finalOutPath and set its permission. |
| |
| if (((IFile.FileBackedInMemIFileWriter) writer).isDataFlushedToDisk()) { |
| this.finalOutPath = |
| ((IFile.FileBackedInMemIFileWriter) writer).getOutputPath(); |
| ensureSpillFilePermissions(finalOutPath, rfs); |
| additionalSpillBytesWritternCounter.increment(writer.getCompressedLength()); |
| } |
| } |
| |
| return (writer != null) && (dataViaEventsEnabled) |
| && (writer.getCompressedLength() <= dataViaEventsMaxSize); |
| } |
| |
| private ByteBuffer readDataForDME() throws IOException { |
| if (this.useCachedStream |
| && !((IFile.FileBackedInMemIFileWriter) writer).isDataFlushedToDisk()) { |
| return ((IFile.FileBackedInMemIFileWriter) writer).getData(); |
| } else { |
| try (FSDataInputStream inStream = rfs.open(finalOutPath)) { |
| byte[] buf = new byte[(int) writer.getCompressedLength()]; |
| IOUtils.readFully(inStream, buf, 0, (int) writer.getCompressedLength()); |
| additionalSpillBytesReadCounter.increment(writer.getCompressedLength()); |
| return ByteBuffer.wrap(buf); |
| } |
| } |
| } |
| |
| @Override |
| public List<Event> close() throws IOException, InterruptedException { |
| // In case there are buffers to be spilled, schedule spilling |
| scheduleSpill(true); |
| List<Event> eventList = Lists.newLinkedList(); |
| isShutdown.set(true); |
| spillLock.lock(); |
| try { |
| LOG.info( |
| sourceDestNameTrimmed + ": " + "Waiting for all spills to complete : Pending : " + pendingSpillCount.get()); |
| while (pendingSpillCount.get() != 0 && spillException == null) { |
| spillInProgress.await(); |
| } |
| } finally { |
| spillLock.unlock(); |
| } |
| if (spillException != null) { |
| LOG.error(sourceDestNameTrimmed + ": " + "Error during spill, throwing"); |
| // Assuming close will be called on the same thread as the write |
| cleanup(); |
| currentBuffer.cleanup(); |
| currentBuffer = null; |
| if (spillException instanceof IOException) { |
| throw (IOException) spillException; |
| } else { |
| throw new IOException(spillException); |
| } |
| } else { |
| LOG.info(sourceDestNameTrimmed + ": " + "All spills complete"); |
| // Assuming close will be called on the same thread as the write |
| cleanup(); |
| |
| List<Event> events = Lists.newLinkedList(); |
| if (!pipelinedShuffle) { |
| if (skipBuffers) { |
| writer.close(); |
| long rawLen = writer.getRawLength(); |
| long compLen = writer.getCompressedLength(); |
| |
| BitSet emptyPartitions = new BitSet(); |
| if (outputRecordsCounter.getValue() == 0) { |
| emptyPartitions.set(0); |
| } |
| if (reportPartitionStats()) { |
| if (outputRecordsCounter.getValue() > 0) { |
| sizePerPartition[0] = rawLen; |
| } |
| } |
| cleanupCurrentBuffer(); |
| |
| if (outputRecordsCounter.getValue() > 0) { |
| outputBytesWithOverheadCounter.increment(rawLen); |
| fileOutputBytesCounter.increment(compLen + indexFileSizeEstimate); |
| } |
| eventList.add(generateVMEvent()); |
| |
| if (!canSendDataOverDME()) { |
| TezIndexRecord rec = new TezIndexRecord(0, rawLen, compLen); |
| TezSpillRecord sr = new TezSpillRecord(1); |
| sr.putIndex(rec, 0); |
| finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate); |
| sr.writeToFile(finalIndexPath, conf, localFs); |
| } |
| eventList.add(generateDMEvent(false, -1, false, outputContext |
| .getUniqueIdentifier(), emptyPartitions)); |
| |
| return eventList; |
| } |
| |
| /* |
| 1. Final merge enabled |
| - When lots of spills are there, mergeAll, generate events and return |
| - If there are no existing spills, check for final spill and generate events |
| 2. Final merge disabled |
| - If finalSpill generated data, generate events and return |
| - If finalSpill did not generate data, it would automatically populate events |
| */ |
| if (isFinalMergeEnabled) { |
| if (numSpills.get() > 0) { |
| mergeAll(); |
| } else { |
| finalSpill(); |
| } |
| updateTezCountersAndNotify(); |
| eventList.add(generateVMEvent()); |
| eventList.add(generateDMEvent()); |
| } else { |
| // if no data is generated, finalSpill would create VMEvent & add to finalEvents |
| SpillResult result = finalSpill(); |
| if (result != null) { |
| updateTezCountersAndNotify(); |
| // Generate vm event |
| finalEvents.add(generateVMEvent()); |
| |
| // compute empty partitions based on spill result and generate DME |
| int spillNum = numSpills.get() - 1; |
| SpillCallback callback = new SpillCallback(spillNum); |
| callback.computePartitionStats(result); |
| BitSet emptyPartitions = getEmptyPartitions(callback.getRecordsPerPartition()); |
| String pathComponent = generatePathComponent(outputContext.getUniqueIdentifier(), spillNum); |
| Event finalEvent = generateDMEvent(true, spillNum, |
| true, pathComponent, emptyPartitions); |
| finalEvents.add(finalEvent); |
| } |
| //all events to be sent out are in finalEvents. |
| eventList.addAll(finalEvents); |
| } |
| cleanupCurrentBuffer(); |
| return eventList; |
| } |
| |
| //For pipelined case, send out an event in case finalspill generated a spill file. |
| if (finalSpill() != null) { |
| // VertexManagerEvent is only sent at the end and thus sizePerPartition is used |
| // for the sum of all spills. |
| mayBeSendEventsForSpill(currentBuffer.recordsPerPartition, |
| sizePerPartition, numSpills.get() - 1, true); |
| } |
| updateTezCountersAndNotify(); |
| cleanupCurrentBuffer(); |
| return events; |
| } |
| } |
| |
| private BitSet getEmptyPartitions(int[] recordsPerPartition) { |
| Preconditions.checkArgument(recordsPerPartition != null, "records per partition can not be null"); |
| BitSet emptyPartitions = new BitSet(); |
| for (int i = 0; i < numPartitions; i++) { |
| if (recordsPerPartition[i] == 0 ) { |
| emptyPartitions.set(i); |
| } |
| } |
| return emptyPartitions; |
| } |
| |
| public boolean reportDetailedPartitionStats() { |
| return reportPartitionStats.isPrecise(); |
| } |
| |
| private Event generateVMEvent() throws IOException { |
| return ShuffleUtils.generateVMEvent(outputContext, this.sizePerPartition, |
| this.reportDetailedPartitionStats(), deflater.get()); |
| } |
| |
| private Event generateDMEvent() throws IOException { |
| BitSet emptyPartitions = getEmptyPartitions(numRecordsPerPartition); |
| return generateDMEvent(false, -1, false, outputContext.getUniqueIdentifier(), emptyPartitions); |
| } |
| |
| private Event generateDMEvent(boolean addSpillDetails, int spillId, |
| boolean isLastSpill, String pathComponent, BitSet emptyPartitions) |
| throws IOException { |
| |
| outputContext.notifyProgress(); |
| DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto |
| .newBuilder(); |
| if (numPartitions == 1) { |
| payloadBuilder.setNumRecord((int) outputRecordsCounter.getValue()); |
| } |
| |
| String host = getHost(); |
| if (emptyPartitions.cardinality() != 0) { |
| // Empty partitions exist |
| ByteString emptyPartitionsByteString = |
| TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray |
| (emptyPartitions), deflater.get()); |
| payloadBuilder.setEmptyPartitions(emptyPartitionsByteString); |
| } |
| |
| if (emptyPartitions.cardinality() != numPartitions) { |
| // Populate payload only if at least 1 partition has data |
| payloadBuilder.setHost(host); |
| payloadBuilder.setPort(getShufflePort()); |
| payloadBuilder.setPathComponent(pathComponent); |
| } |
| |
| if (addSpillDetails) { |
| payloadBuilder.setSpillId(spillId); |
| payloadBuilder.setLastEvent(isLastSpill); |
| } |
| |
| if (canSendDataOverDME()) { |
| ShuffleUserPayloads.DataProto.Builder dataProtoBuilder = ShuffleUserPayloads.DataProto.newBuilder(); |
| dataProtoBuilder.setData(ByteString.copyFrom(readDataForDME())); |
| dataProtoBuilder.setRawLength((int) this.writer.getRawLength()); |
| |
| dataProtoBuilder.setCompressedLength((int) this.writer.getCompressedLength()); |
| payloadBuilder.setData(dataProtoBuilder.build()); |
| |
| this.dataViaEventSize.increment(this.writer.getCompressedLength()); |
| LOG.debug("payload packed in DME, dataSize: " + this.writer.getCompressedLength()); |
| } |
| |
| ByteBuffer payload = payloadBuilder.build().toByteString().asReadOnlyByteBuffer(); |
| return CompositeDataMovementEvent.create(0, numPartitions, payload); |
| } |
| |
| private void cleanupCurrentBuffer() { |
| currentBuffer.cleanup(); |
| currentBuffer = null; |
| } |
| |
| private void cleanup() { |
| if (spillExecutor != null) { |
| spillExecutor.shutdownNow(); |
| } |
| for (int i = 0; i < buffers.length; i++) { |
| if (buffers[i] != null && buffers[i] != currentBuffer) { |
| buffers[i].cleanup(); |
| buffers[i] = null; |
| } |
| } |
| availableBuffers.clear(); |
| } |
| |
| private SpillResult finalSpill() throws IOException { |
| if (currentBuffer.nextPosition == 0) { |
| if (pipelinedShuffle || !isFinalMergeEnabled) { |
| List<Event> eventList = Lists.newLinkedList(); |
| eventList.add(ShuffleUtils.generateVMEvent(outputContext, |
| reportPartitionStats() ? new long[numPartitions] : null, |
| reportDetailedPartitionStats(), deflater.get())); |
| if (localOutputRecordsCounter == 0 && outputLargeRecordsCounter.getValue() == 0) { |
| // Should send this event (all empty partitions) only when no records are written out. |
| BitSet emptyPartitions = new BitSet(numPartitions); |
| emptyPartitions.flip(0, numPartitions); |
| eventList.add(generateDMEvent(true, numSpills.get(), true, |
| null, emptyPartitions)); |
| } |
| if (pipelinedShuffle) { |
| outputContext.sendEvents(eventList); |
| } else if (!isFinalMergeEnabled) { |
| finalEvents.addAll(0, eventList); |
| } |
| } |
| return null; |
| } else { |
| updateGlobalStats(currentBuffer); |
| filledBuffers.add(currentBuffer); |
| |
| //setup output file and index file |
| SpillPathDetails spillPathDetails = getSpillPathDetails(true, -1); |
| SpillCallable spillCallable = new SpillCallable(filledBuffers, |
| codec, null, spillPathDetails); |
| try { |
| SpillResult spillResult = spillCallable.call(); |
| |
| fileOutputBytesCounter.increment(spillResult.spillSize); |
| fileOutputBytesCounter.increment(indexFileSizeEstimate); |
| return spillResult; |
| } catch (Exception ex) { |
| throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex); |
| } |
| } |
| |
| } |
| |
| /** |
| * Set up spill output file, index file details. |
| * |
| * @param isFinalSpill |
| * @param expectedSpillSize |
| * @return SpillPathDetails |
| * @throws IOException |
| */ |
| private SpillPathDetails getSpillPathDetails(boolean isFinalSpill, long expectedSpillSize) |
| throws IOException { |
| int spillNumber = numSpills.getAndIncrement(); |
| return getSpillPathDetails(isFinalSpill, expectedSpillSize, spillNumber); |
| } |
| |
| /** |
| * Set up spill output file, index file details. |
| * |
| * @param isFinalSpill |
| * @param expectedSpillSize |
| * @param spillNumber |
| * @return SpillPathDetails |
| * @throws IOException |
| */ |
| private SpillPathDetails getSpillPathDetails(boolean isFinalSpill, long expectedSpillSize, |
| int spillNumber) throws IOException { |
| long spillSize = (expectedSpillSize < 0) ? |
| (currentBuffer.nextPosition + numPartitions * APPROX_HEADER_LENGTH) : expectedSpillSize; |
| |
| Path outputFilePath = null; |
| Path indexFilePath = null; |
| |
| if (!pipelinedShuffle && isFinalMergeEnabled) { |
| if (isFinalSpill) { |
| outputFilePath = outputFileHandler.getOutputFileForWrite(spillSize); |
| indexFilePath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate); |
| |
| //Setting this for tests |
| finalOutPath = outputFilePath; |
| finalIndexPath = indexFilePath; |
| } else { |
| outputFilePath = outputFileHandler.getSpillFileForWrite(spillNumber, spillSize); |
| } |
| } else { |
| outputFilePath = outputFileHandler.getSpillFileForWrite(spillNumber, spillSize); |
| indexFilePath = outputFileHandler.getSpillIndexFileForWrite(spillNumber, indexFileSizeEstimate); |
| } |
| |
| return new SpillPathDetails(outputFilePath, indexFilePath, spillNumber); |
| } |
| |
| private void mergeAll() throws IOException { |
| long expectedSize = spilledSize; |
| if (currentBuffer.nextPosition != 0) { |
| expectedSize += currentBuffer.nextPosition - (currentBuffer.numRecords * META_SIZE) |
| - currentBuffer.skipSize + numPartitions * APPROX_HEADER_LENGTH; |
| // Update final statistics. |
| updateGlobalStats(currentBuffer); |
| } |
| |
| SpillPathDetails spillPathDetails = getSpillPathDetails(true, expectedSize); |
| finalIndexPath = spillPathDetails.indexFilePath; |
| finalOutPath = spillPathDetails.outputFilePath; |
| |
| TezSpillRecord finalSpillRecord = new TezSpillRecord(numPartitions); |
| |
| DataInputBuffer keyBuffer = new DataInputBuffer(); |
| DataInputBuffer valBuffer = new DataInputBuffer(); |
| |
| DataInputBuffer keyBufferIFile = new DataInputBuffer(); |
| DataInputBuffer valBufferIFile = new DataInputBuffer(); |
| |
| FSDataOutputStream out = null; |
| try { |
| out = rfs.create(finalOutPath); |
| ensureSpillFilePermissions(finalOutPath, rfs); |
| Writer writer = null; |
| |
| for (int i = 0; i < numPartitions; i++) { |
| long segmentStart = out.getPos(); |
| if (numRecordsPerPartition[i] == 0) { |
| LOG.info( |
| sourceDestNameTrimmed + ": " + "Skipping partition: " + i + " in final merge since it has no records"); |
| continue; |
| } |
| writer = new Writer(keySerialization, valSerialization, out, keyClass, valClass, codec, null, null); |
| try { |
| if (currentBuffer.nextPosition != 0 |
| && currentBuffer.partitionPositions[i] != WrappedBuffer.PARTITION_ABSENT_POSITION) { |
| // Write current buffer. |
| writePartition(currentBuffer.partitionPositions[i], currentBuffer, writer, keyBuffer, |
| valBuffer); |
| } |
| synchronized (spillInfoList) { |
| for (SpillInfo spillInfo : spillInfoList) { |
| TezIndexRecord indexRecord = spillInfo.spillRecord.getIndex(i); |
| if (indexRecord.getPartLength() == 0) { |
| // Skip empty partitions within a spill |
| continue; |
| } |
| FSDataInputStream in = rfs.open(spillInfo.outPath); |
| in.seek(indexRecord.getStartOffset()); |
| IFile.Reader reader = new IFile.Reader(in, indexRecord.getPartLength(), codec, null, |
| additionalSpillBytesReadCounter, ifileReadAhead, ifileReadAheadLength, |
| ifileBufferSize); |
| while (reader.nextRawKey(keyBufferIFile)) { |
| // TODO Inefficient. If spills are not compressed, a direct copy should be possible |
| // given the current IFile format. Also exteremely inefficient for large records, |
| // since the entire record will be read into memory. |
| reader.nextRawValue(valBufferIFile); |
| writer.append(keyBufferIFile, valBufferIFile); |
| } |
| reader.close(); |
| } |
| } |
| writer.close(); |
| fileOutputBytesCounter.increment(writer.getCompressedLength()); |
| TezIndexRecord indexRecord = new TezIndexRecord(segmentStart, writer.getRawLength(), |
| writer.getCompressedLength()); |
| writer = null; |
| finalSpillRecord.putIndex(indexRecord, i); |
| outputContext.notifyProgress(); |
| } finally { |
| if (writer != null) { |
| writer.close(); |
| } |
| } |
| } |
| } finally { |
| if (out != null) { |
| out.close(); |
| } |
| deleteIntermediateSpills(); |
| } |
| finalSpillRecord.writeToFile(finalIndexPath, conf, localFs); |
| fileOutputBytesCounter.increment(indexFileSizeEstimate); |
| LOG.info(sourceDestNameTrimmed + ": " + "Finished final spill after merging : " + numSpills.get() + " spills"); |
| } |
| |
| private void deleteIntermediateSpills() { |
| // Delete the intermediate spill files |
| synchronized (spillInfoList) { |
| for (SpillInfo spill : spillInfoList) { |
| try { |
| rfs.delete(spill.outPath, false); |
| } catch (IOException e) { |
| LOG.warn("Unable to delete intermediate spill " + spill.outPath, e); |
| } |
| } |
| } |
| } |
| |
| private void writeLargeRecord(final Object key, final Object value, final int partition) |
| throws IOException { |
| numAdditionalSpillsCounter.increment(1); |
| long size = sizePerBuffer - (currentBuffer.numRecords * META_SIZE) - currentBuffer.skipSize |
| + numPartitions * APPROX_HEADER_LENGTH; |
| SpillPathDetails spillPathDetails = getSpillPathDetails(false, size); |
| int spillIndex = spillPathDetails.spillIndex; |
| FSDataOutputStream out = null; |
| long outSize = 0; |
| try { |
| final TezSpillRecord spillRecord = new TezSpillRecord(numPartitions); |
| final Path outPath = spillPathDetails.outputFilePath; |
| out = rfs.create(outPath); |
| ensureSpillFilePermissions(outPath, rfs); |
| BitSet emptyPartitions = null; |
| if (pipelinedShuffle || !isFinalMergeEnabled) { |
| emptyPartitions = new BitSet(numPartitions); |
| } |
| for (int i = 0; i < numPartitions; i++) { |
| final long recordStart = out.getPos(); |
| if (i == partition) { |
| spilledRecordsCounter.increment(1); |
| Writer writer = null; |
| try { |
| writer = new IFile.Writer(keySerialization, valSerialization, out, keyClass, valClass, codec, null, null); |
| writer.append(key, value); |
| outputLargeRecordsCounter.increment(1); |
| numRecordsPerPartition[i]++; |
| if (reportPartitionStats()) { |
| sizePerPartition[i] += writer.getRawLength(); |
| } |
| writer.close(); |
| synchronized (additionalSpillBytesWritternCounter) { |
| additionalSpillBytesWritternCounter.increment(writer.getCompressedLength()); |
| } |
| TezIndexRecord indexRecord = new TezIndexRecord(recordStart, writer.getRawLength(), |
| writer.getCompressedLength()); |
| spillRecord.putIndex(indexRecord, i); |
| outSize = writer.getCompressedLength(); |
| writer = null; |
| } finally { |
| if (writer != null) { |
| writer.close(); |
| } |
| } |
| } else { |
| if (emptyPartitions != null) { |
| emptyPartitions.set(i); |
| } |
| } |
| } |
| handleSpillIndex(spillPathDetails, spillRecord); |
| |
| mayBeSendEventsForSpill(emptyPartitions, sizePerPartition, |
| spillIndex, false); |
| |
| LOG.info(sourceDestNameTrimmed + ": " + "Finished writing large record of size " + outSize + " to spill file " |
| + spillIndex); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(sourceDestNameTrimmed + ": " + "LargeRecord Spill=" + spillIndex + ", indexPath=" |
| + spillPathDetails.indexFilePath + ", outputPath=" |
| + spillPathDetails.outputFilePath); |
| } |
| } finally { |
| if (out != null) { |
| out.close(); |
| } |
| } |
| } |
| |
| private void handleSpillIndex(SpillPathDetails spillPathDetails, TezSpillRecord spillRecord) |
| throws IOException { |
| if (spillPathDetails.indexFilePath != null) { |
| //write the index record |
| spillRecord.writeToFile(spillPathDetails.indexFilePath, conf, localFs); |
| } else { |
| //add to cache |
| SpillInfo spillInfo = new SpillInfo(spillRecord, spillPathDetails.outputFilePath); |
| spillInfoList.add(spillInfo); |
| numAdditionalSpillsCounter.increment(1); |
| } |
| } |
| |
| private class ByteArrayOutputStream extends OutputStream { |
| |
| private final byte[] scratch = new byte[1]; |
| |
| @Override |
| public void write(int v) throws IOException { |
| scratch[0] = (byte) v; |
| write(scratch, 0, 1); |
| } |
| |
| public void write(byte[] b, int off, int len) throws IOException { |
| if (currentBuffer.full) { |
| /* no longer do anything until reset */ |
| } else if (len > currentBuffer.availableSize) { |
| currentBuffer.full = true; /* stop working & signal we hit the end */ |
| } else { |
| System.arraycopy(b, off, currentBuffer.buffer, currentBuffer.nextPosition, len); |
| currentBuffer.nextPosition += len; |
| currentBuffer.availableSize -= len; |
| } |
| } |
| } |
| |
| private static class WrappedBuffer { |
| |
| private static final int PARTITION_ABSENT_POSITION = -1; |
| |
| private final int[] partitionPositions; |
| private final int[] recordsPerPartition; |
| // uncompressed size for each partition |
| private final long[] sizePerPartition; |
| private final int numPartitions; |
| private final int size; |
| |
| private byte[] buffer; |
| private IntBuffer metaBuffer; |
| |
| private int numRecords = 0; |
| private int skipSize = 0; |
| |
| private int nextPosition = 0; |
| private int availableSize; |
| private boolean full = false; |
| |
| WrappedBuffer(int numPartitions, int size) { |
| this.partitionPositions = new int[numPartitions]; |
| this.recordsPerPartition = new int[numPartitions]; |
| this.sizePerPartition = new long[numPartitions]; |
| this.numPartitions = numPartitions; |
| for (int i = 0; i < numPartitions; i++) { |
| this.partitionPositions[i] = PARTITION_ABSENT_POSITION; |
| this.recordsPerPartition[i] = 0; |
| this.sizePerPartition[i] = 0; |
| } |
| size = size - (size % INT_SIZE); |
| this.size = size; |
| this.buffer = new byte[size]; |
| this.metaBuffer = ByteBuffer.wrap(buffer).order(ByteOrder.nativeOrder()).asIntBuffer(); |
| availableSize = size; |
| } |
| |
| void reset() { |
| for (int i = 0; i < numPartitions; i++) { |
| this.partitionPositions[i] = PARTITION_ABSENT_POSITION; |
| this.recordsPerPartition[i] = 0; |
| this.sizePerPartition[i] = 0; |
| } |
| numRecords = 0; |
| nextPosition = 0; |
| skipSize = 0; |
| availableSize = size; |
| full = false; |
| } |
| |
| void cleanup() { |
| buffer = null; |
| metaBuffer = null; |
| } |
| } |
| |
| private String generatePathComponent(String uniqueId, int spillNumber) { |
| return (uniqueId + "_" + spillNumber); |
| } |
| |
| private List<Event> generateEventForSpill(BitSet emptyPartitions, long[] sizePerPartition, |
| int spillNumber, |
| boolean isFinalUpdate) throws IOException { |
| List<Event> eventList = Lists.newLinkedList(); |
| //Send out an event for consuming. |
| String pathComponent = generatePathComponent(outputContext.getUniqueIdentifier(), spillNumber); |
| if (isFinalUpdate) { |
| eventList.add(ShuffleUtils.generateVMEvent(outputContext, |
| sizePerPartition, reportDetailedPartitionStats(), deflater.get())); |
| } |
| Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate, |
| pathComponent, emptyPartitions); |
| eventList.add(compEvent); |
| return eventList; |
| } |
| |
| private void mayBeSendEventsForSpill( |
| BitSet emptyPartitions, long[] sizePerPartition, |
| int spillNumber, boolean isFinalUpdate) { |
| if (!pipelinedShuffle) { |
| if (isFinalMergeEnabled) { |
| return; |
| } |
| } |
| List<Event> events = null; |
| try { |
| events = generateEventForSpill(emptyPartitions, sizePerPartition, spillNumber, |
| isFinalUpdate); |
| LOG.info(sourceDestNameTrimmed + ": " + "Adding spill event for spill" |
| + " (final update=" + isFinalUpdate + "), spillId=" + spillNumber); |
| if (pipelinedShuffle) { |
| //Send out an event for consuming. |
| outputContext.sendEvents(events); |
| } else if (!isFinalMergeEnabled) { |
| this.finalEvents.addAll(events); |
| } |
| } catch (IOException e) { |
| LOG.error(sourceDestNameTrimmed + ": " + "Error in sending pipelined events", e); |
| outputContext.reportFailure(TaskFailureType.NON_FATAL, e, |
| "Error in sending events."); |
| } |
| } |
| |
| private void mayBeSendEventsForSpill(int[] recordsPerPartition, |
| long[] sizePerPartition, int spillNumber, boolean isFinalUpdate) { |
| BitSet emptyPartitions = getEmptyPartitions(recordsPerPartition); |
| mayBeSendEventsForSpill(emptyPartitions, sizePerPartition, spillNumber, |
| isFinalUpdate); |
| } |
| |
| private class SpillCallback implements FutureCallback<SpillResult> { |
| |
| private final int spillNumber; |
| private int recordsPerPartition[]; |
| private long sizePerPartition[]; |
| |
| SpillCallback(int spillNumber) { |
| this.spillNumber = spillNumber; |
| } |
| |
| void computePartitionStats(SpillResult result) { |
| if (result.filledBuffers.size() == 1) { |
| recordsPerPartition = result.filledBuffers.get(0).recordsPerPartition; |
| sizePerPartition = result.filledBuffers.get(0).sizePerPartition; |
| } else { |
| recordsPerPartition = new int[numPartitions]; |
| sizePerPartition = new long[numPartitions]; |
| for (WrappedBuffer buffer : result.filledBuffers) { |
| for (int i = 0; i < numPartitions; ++i) { |
| recordsPerPartition[i] += buffer.recordsPerPartition[i]; |
| sizePerPartition[i] += buffer.sizePerPartition[i]; |
| } |
| } |
| } |
| } |
| |
| int[] getRecordsPerPartition() { |
| return recordsPerPartition; |
| } |
| |
| @Override |
| public void onSuccess(SpillResult result) { |
| synchronized (UnorderedPartitionedKVWriter.this) { |
| spilledSize += result.spillSize; |
| } |
| |
| computePartitionStats(result); |
| |
| mayBeSendEventsForSpill(recordsPerPartition, sizePerPartition, spillNumber, false); |
| |
| try { |
| for (WrappedBuffer buffer : result.filledBuffers) { |
| buffer.reset(); |
| availableBuffers.add(buffer); |
| } |
| } catch (Throwable e) { |
| LOG.error(sourceDestNameTrimmed + ": Failure while attempting to reset buffer after spill", e); |
| outputContext.reportFailure(TaskFailureType.NON_FATAL, e, "Failure while attempting to reset buffer after spill"); |
| } |
| |
| if (!pipelinedShuffle && isFinalMergeEnabled) { |
| synchronized(additionalSpillBytesWritternCounter) { |
| additionalSpillBytesWritternCounter.increment(result.spillSize); |
| } |
| } else { |
| synchronized(fileOutputBytesCounter) { |
| fileOutputBytesCounter.increment(indexFileSizeEstimate); |
| fileOutputBytesCounter.increment(result.spillSize); |
| } |
| } |
| |
| spillLock.lock(); |
| try { |
| if (pendingSpillCount.decrementAndGet() == 0) { |
| spillInProgress.signal(); |
| } |
| } finally { |
| spillLock.unlock(); |
| availableSlots.release(); |
| } |
| } |
| |
| @Override |
| public void onFailure(Throwable t) { |
| // spillException setup to throw an exception back to the user. Requires synchronization. |
| // Consider removing it in favor of having Tez kill the task |
| LOG.error(sourceDestNameTrimmed + ": " + "Failure while spilling to disk", t); |
| spillException = t; |
| outputContext.reportFailure(TaskFailureType.NON_FATAL, t, "Failure while spilling to disk"); |
| spillLock.lock(); |
| try { |
| spillInProgress.signal(); |
| } finally { |
| spillLock.unlock(); |
| availableSlots.release(); |
| } |
| } |
| } |
| |
| private static class SpillResult { |
| final long spillSize; |
| final List<WrappedBuffer> filledBuffers; |
| |
| SpillResult(long size, List<WrappedBuffer> filledBuffers) { |
| this.spillSize = size; |
| this.filledBuffers = filledBuffers; |
| } |
| } |
| |
| @VisibleForTesting |
| static class SpillInfo { |
| final TezSpillRecord spillRecord; |
| final Path outPath; |
| |
| SpillInfo(TezSpillRecord spillRecord, Path outPath) { |
| this.spillRecord = spillRecord; |
| this.outPath = outPath; |
| } |
| } |
| |
| @VisibleForTesting |
| String getHost() { |
| return outputContext.getExecutionContext().getHostName(); |
| } |
| |
| @VisibleForTesting |
| int getShufflePort() throws IOException { |
| String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, |
| TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); |
| ByteBuffer shuffleMetadata = outputContext |
| .getServiceProviderMetaData(auxiliaryService); |
| int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata); |
| return shufflePort; |
| } |
| |
| @InterfaceAudience.Private |
| static class SpillPathDetails { |
| final Path indexFilePath; |
| final Path outputFilePath; |
| final int spillIndex; |
| |
| SpillPathDetails(Path outputFilePath, Path indexFilePath, int spillIndex) { |
| this.outputFilePath = outputFilePath; |
| this.indexFilePath = indexFilePath; |
| this.spillIndex = spillIndex; |
| } |
| } |
| } |