| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.tez.runtime.library.common.writers; |
| |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.nio.ByteBuffer; |
| import java.nio.ByteOrder; |
| import java.nio.IntBuffer; |
| import java.util.ArrayList; |
| import java.util.BitSet; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocalFileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.DataInputBuffer; |
| import org.apache.hadoop.io.compress.CompressionCodec; |
| import org.apache.hadoop.yarn.api.ApplicationConstants; |
| import org.apache.tez.common.TezCommonUtils; |
| import org.apache.tez.common.TezJobConfig; |
| import org.apache.tez.common.TezUtils; |
| import org.apache.tez.common.counters.TaskCounter; |
| import org.apache.tez.common.counters.TezCounter; |
| import org.apache.tez.runtime.api.Event; |
| import org.apache.tez.runtime.api.TezOutputContext; |
| import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; |
| import org.apache.tez.runtime.library.common.Constants; |
| import org.apache.tez.runtime.library.common.sort.impl.IFile; |
| import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; |
| import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer; |
| import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord; |
| import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils; |
| import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.util.concurrent.FutureCallback; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.ListeningExecutorService; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import com.google.protobuf.ByteString; |
| |
| public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWriter { |
| |
| private static final Log LOG = LogFactory.getLog(UnorderedPartitionedKVWriter.class); |
| |
| private static final int INT_SIZE = 4; |
| private static final int NUM_META = 3; // Number of meta fields. |
| private static final int INDEX_KEYLEN = 0; // KeyLength index |
| private static final int INDEX_VALLEN = 1; // ValLength index |
| private static final int INDEX_NEXT = 2; // Next Record Index. |
| private static final int META_SIZE = NUM_META * INT_SIZE; // Size of total meta-data |
| |
| private final static int APPROX_HEADER_LENGTH = 150; |
| |
| // Maybe setup a separate statistics class which can be shared between the |
| // buffer and the main path instead of having multiple arrays. |
| |
| private final long availableMemory; |
| @VisibleForTesting |
| final WrappedBuffer[] buffers; |
| @VisibleForTesting |
| final BlockingQueue<WrappedBuffer> availableBuffers; |
| private final ByteArrayOutputStream baos; |
| private final DataOutputStream dos; |
| @VisibleForTesting |
| WrappedBuffer currentBuffer; |
| private final FileSystem rfs; |
| |
| private final List<SpillInfo> spillInfoList = Collections |
| .synchronizedList(new ArrayList<SpillInfo>()); |
| |
| private final ListeningExecutorService spillExecutor; |
| |
| private final int[] numRecordsPerPartition; |
| private volatile long spilledSize = 0; |
| |
| /** |
| * Represents final number of records written (spills are not counted) |
| */ |
| protected final TezCounter outputLargeRecordsCounter; |
| |
| @VisibleForTesting |
| int numBuffers; |
| @VisibleForTesting |
| int sizePerBuffer; |
| @VisibleForTesting |
| int numInitializedBuffers; |
| |
| private Throwable spillException; |
| private AtomicBoolean isShutdown = new AtomicBoolean(false); |
| @VisibleForTesting |
| final AtomicInteger numSpills = new AtomicInteger(0); |
| private final AtomicInteger pendingSpillCount = new AtomicInteger(0); |
| |
| private final ReentrantLock spillLock = new ReentrantLock(); |
| private final Condition spillInProgress = spillLock.newCondition(); |
| |
| public UnorderedPartitionedKVWriter(TezOutputContext outputContext, Configuration conf, |
| int numOutputs, long availableMemoryBytes) throws IOException { |
| super(outputContext, conf, numOutputs); |
| Preconditions.checkArgument(availableMemoryBytes > 0, "availableMemory should not be > 0 bytes"); |
| // Ideally, should be significantly larger. |
| availableMemory = availableMemoryBytes; |
| |
| // Allow unit tests to control the buffer sizes. |
| int maxSingleBufferSizeBytes = conf.getInt( |
| TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, Integer.MAX_VALUE); |
| computeNumBuffersAndSize(maxSingleBufferSizeBytes); |
| LOG.info("Running with numBuffers=" + numBuffers + ", sizePerBuffer=" + sizePerBuffer); |
| availableBuffers = new LinkedBlockingQueue<WrappedBuffer>(); |
| buffers = new WrappedBuffer[numBuffers]; |
| // Set up only the first buffer to start with. |
| buffers[0] = new WrappedBuffer(numOutputs, sizePerBuffer); |
| numInitializedBuffers = 1; |
| LOG.info("Initialize Buffer #" + numInitializedBuffers + " with size=" + sizePerBuffer); |
| currentBuffer = buffers[0]; |
| baos = new ByteArrayOutputStream(); |
| dos = new DataOutputStream(baos); |
| keySerializer.open(dos); |
| valSerializer.open(dos); |
| rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw(); |
| |
| ExecutorService executor = Executors.newFixedThreadPool( |
| 1, |
| new ThreadFactoryBuilder() |
| .setDaemon(true) |
| .setNameFormat( |
| "UnorderedOutSpiller [" |
| + TezUtils.cleanVertexName(outputContext.getDestinationVertexName()) + "]") |
| .build()); |
| spillExecutor = MoreExecutors.listeningDecorator(executor); |
| numRecordsPerPartition = new int[numPartitions]; |
| |
| outputLargeRecordsCounter = outputContext.getCounters().findCounter( |
| TaskCounter.OUTPUT_LARGE_RECORDS); |
| } |
| |
| private void computeNumBuffersAndSize(int bufferLimit) { |
| numBuffers = Math.max(2, (int) (availableMemory / bufferLimit) |
| + ((availableMemory % bufferLimit) == 0 ? 0 : 1)); |
| sizePerBuffer = (int) (availableMemory / numBuffers); |
| sizePerBuffer = sizePerBuffer - (sizePerBuffer % INT_SIZE); |
| } |
| |
| @Override |
| public void write(Object key, Object value) throws IOException { |
| // Skipping checks for key-value types. IFile takes care of these, but should be removed from |
| // there as well. |
| |
| // How expensive are checks like these ? |
| if (isShutdown.get()) { |
| throw new RuntimeException("Writer already closed"); |
| } |
| if (spillException != null) { |
| // Already reported as a fatalError - report to the user code |
| throw new IOException("Exception during spill", new IOException(spillException)); |
| } |
| int partition = partitioner.getPartition(key, value, numPartitions); |
| write(key, value, partition); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private void write(Object key, Object value, int partition) throws IOException { |
| // Wrap to 4 byte (Int) boundary for metaData |
| int mod = currentBuffer.nextPosition % INT_SIZE; |
| int metaSkip = mod == 0 ? 0 : (INT_SIZE - mod); |
| if (currentBuffer.availableSize < (META_SIZE + metaSkip)) { |
| // Move over to the next buffer. |
| metaSkip = 0; |
| setupNextBuffer(); |
| } |
| currentBuffer.nextPosition += metaSkip; |
| int metaStart = currentBuffer.nextPosition; |
| currentBuffer.availableSize -= (META_SIZE + metaSkip); |
| currentBuffer.nextPosition += META_SIZE; |
| try { |
| keySerializer.serialize(key); |
| } catch (BufferTooSmallException e) { |
| if (metaStart == 0) { // Started writing at the start of the buffer. Write Key to disk. |
| // Key too large for any buffer. Write entire record to disk. |
| currentBuffer.reset(); |
| writeLargeRecord(key, value, partition, numSpills.incrementAndGet()); |
| return; |
| } else { // Exceeded length on current buffer. |
| // Try resetting the buffer to the next one, if this was not the start of a buffer, |
| // and begin spilling the current buffer to disk if it has any records. |
| setupNextBuffer(); |
| write(key, value, partition); |
| return; |
| } |
| } |
| int valStart = currentBuffer.nextPosition; |
| try { |
| valSerializer.serialize(value); |
| } catch (BufferTooSmallException e) { |
| // Value too large for current buffer, or K-V too large for entire buffer. |
| if (metaStart == 0) { |
| // Key + Value too large for a single buffer. |
| currentBuffer.reset(); |
| writeLargeRecord(key, value, partition, numSpills.incrementAndGet()); |
| return; |
| } else { // Exceeded length on current buffer. |
| // Try writing key+value to a new buffer - will fall back to disk if that fails. |
| setupNextBuffer(); |
| write(key, value, partition); |
| return; |
| } |
| } |
| |
| // Meta-data updates |
| int metaIndex = metaStart / INT_SIZE; |
| int indexNext = currentBuffer.partitionPositions[partition]; |
| |
| currentBuffer.metaBuffer.put(metaIndex + INDEX_KEYLEN, (valStart - (metaStart + META_SIZE))); |
| currentBuffer.metaBuffer.put(metaIndex + INDEX_VALLEN, (currentBuffer.nextPosition - valStart)); |
| currentBuffer.metaBuffer.put(metaIndex + INDEX_NEXT, indexNext); |
| currentBuffer.skipSize += metaSkip; // For size estimation |
| // Update stats on number of records |
| outputRecordBytesCounter.increment(currentBuffer.nextPosition - (metaStart + META_SIZE)); |
| outputBytesWithOverheadCounter.increment((currentBuffer.nextPosition - metaStart) + metaSkip); |
| outputRecordsCounter.increment(1); |
| currentBuffer.partitionPositions[partition] = metaStart; |
| currentBuffer.recordsPerPartition[partition]++; |
| currentBuffer.numRecords++; |
| |
| } |
| |
| private void setupNextBuffer() throws IOException { |
| |
| if (currentBuffer.numRecords == 0) { |
| currentBuffer.reset(); |
| } else { |
| // Update overall stats |
| LOG.info("Moving to next buffer and triggering spill"); |
| updateGlobalStats(currentBuffer); |
| |
| pendingSpillCount.incrementAndGet(); |
| |
| ListenableFuture<SpillResult> future = spillExecutor.submit(new SpillCallable(currentBuffer, |
| numSpills.incrementAndGet(), codec, spilledRecordsCounter, false)); |
| Futures.addCallback(future, new SpillCallback(numSpills.get())); |
| |
| WrappedBuffer wb = getNextAvailableBuffer(); |
| currentBuffer = wb; |
| } |
| } |
| |
| private void updateGlobalStats(WrappedBuffer buffer) { |
| for (int i = 0; i < numPartitions; i++) { |
| numRecordsPerPartition[i] += buffer.recordsPerPartition[i]; |
| } |
| } |
| |
| private WrappedBuffer getNextAvailableBuffer() throws IOException { |
| if (availableBuffers.peek() == null) { |
| if (numInitializedBuffers < numBuffers) { |
| buffers[numInitializedBuffers] = new WrappedBuffer(numPartitions, sizePerBuffer); |
| numInitializedBuffers++; |
| return buffers[numInitializedBuffers - 1]; |
| } else { |
| // All buffers initialized, and none available right now. Wait |
| try { |
| return availableBuffers.take(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new IOException("Interrupted while waiting for next buffer", e); |
| } |
| } |
| } else { |
| return availableBuffers.poll(); |
| } |
| } |
| |
| // All spills using compression for now. |
| private class SpillCallable implements Callable<SpillResult> { |
| |
| private final WrappedBuffer wrappedBuffer; |
| private final CompressionCodec codec; |
| private final TezCounter numRecordsCounter; |
| private final int spillNumber; |
| private final boolean isFinalSpill; |
| |
| public SpillCallable(WrappedBuffer wrappedBuffer, int spillNumber, CompressionCodec codec, |
| TezCounter numRecordsCounter, boolean isFinal) { |
| this.wrappedBuffer = wrappedBuffer; |
| this.codec = codec; |
| this.numRecordsCounter = numRecordsCounter; |
| this.spillNumber = spillNumber; |
| this.isFinalSpill = isFinal; |
| } |
| |
| @Override |
| public SpillResult call() throws IOException { |
| // This should not be called with an empty buffer. Check before invoking. |
| |
| // Number of parallel spills determined by number of threads. |
| // Last spill synchronization handled separately. |
| SpillResult spillResult = null; |
| long spillSize = wrappedBuffer.nextPosition + numPartitions * APPROX_HEADER_LENGTH; |
| Path outPath = null; |
| if (isFinalSpill) { |
| outPath = outputFileHandler.getOutputFileForWrite(spillSize); |
| } else { |
| outPath = outputFileHandler.getSpillFileForWrite(spillNumber, spillSize); |
| } |
| FSDataOutputStream out = rfs.create(outPath); |
| TezSpillRecord spillRecord = new TezSpillRecord(numPartitions); |
| DataInputBuffer key = new DataInputBuffer(); |
| DataInputBuffer val = new DataInputBuffer(); |
| for (int i = 0; i < numPartitions; i++) { |
| IFile.Writer writer = null; |
| try { |
| long segmentStart = out.getPos(); |
| if (wrappedBuffer.partitionPositions[i] == WrappedBuffer.PARTITION_ABSENT_POSITION) { |
| // Skip empty partition. |
| continue; |
| } |
| writer = new Writer(conf, out, keyClass, valClass, codec, numRecordsCounter, null); |
| writePartition(wrappedBuffer.partitionPositions[i], wrappedBuffer, writer, key, val); |
| writer.close(); |
| if (isFinalSpill) { |
| fileOutputBytesCounter.increment(writer.getCompressedLength()); |
| } else { |
| additionalSpillBytesWritternCounter.increment(writer.getCompressedLength()); |
| } |
| spillResult = new SpillResult(writer.getCompressedLength(), this.wrappedBuffer); |
| TezIndexRecord indexRecord = new TezIndexRecord(segmentStart, writer.getRawLength(), |
| writer.getCompressedLength()); |
| spillRecord.putIndex(indexRecord, i); |
| writer = null; |
| } finally { |
| if (writer != null) { |
| writer.close(); |
| } |
| } |
| } |
| if (isFinalSpill) { |
| long indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH; |
| Path finalSpillFile = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate); |
| spillRecord.writeToFile(finalSpillFile, conf); |
| fileOutputBytesCounter.increment(indexFileSizeEstimate); |
| LOG.info("Finished final and only spill"); |
| } else { |
| SpillInfo spillInfo = new SpillInfo(spillRecord, outPath); |
| spillInfoList.add(spillInfo); |
| numAdditionalSpillsCounter.increment(1); |
| LOG.info("Finished spill " + spillNumber); |
| } |
| return spillResult; |
| } |
| } |
| |
| private void writePartition(int pos, WrappedBuffer wrappedBuffer, Writer writer, |
| DataInputBuffer keyBuffer, DataInputBuffer valBuffer) throws IOException { |
| while (pos != WrappedBuffer.PARTITION_ABSENT_POSITION) { |
| int metaIndex = pos / INT_SIZE; |
| int keyLength = wrappedBuffer.metaBuffer.get(metaIndex + INDEX_KEYLEN); |
| int valLength = wrappedBuffer.metaBuffer.get(metaIndex + INDEX_VALLEN); |
| keyBuffer.reset(wrappedBuffer.buffer, pos + META_SIZE, keyLength); |
| valBuffer.reset(wrappedBuffer.buffer, pos + META_SIZE + keyLength, valLength); |
| |
| writer.append(keyBuffer, valBuffer); |
| pos = wrappedBuffer.metaBuffer.get(metaIndex + INDEX_NEXT); |
| } |
| } |
| |
| public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) { |
| int initialMemRequestMb = conf.getInt(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, |
| TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB_DEFAULT); |
| Preconditions.checkArgument(initialMemRequestMb != 0, |
| TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB + " should be larger than 0"); |
| long reqBytes = initialMemRequestMb << 20; |
| LOG.info("Requested BufferSize (" + TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB |
| + ") : " + initialMemRequestMb); |
| return reqBytes; |
| } |
| |
| @Override |
| public List<Event> close() throws IOException, InterruptedException { |
| isShutdown.set(true); |
| spillLock.lock(); |
| LOG.info("Waiting for all spills to complete : Pending : " + pendingSpillCount.get()); |
| try { |
| while (pendingSpillCount.get() != 0 && spillException == null) { |
| spillInProgress.await(); |
| } |
| } finally { |
| spillLock.unlock(); |
| } |
| if (spillException != null) { |
| LOG.fatal("Error during spill, throwing"); |
| // Assuming close will be called on the same thread as the write |
| cleanup(); |
| currentBuffer.cleanup(); |
| currentBuffer = null; |
| if (spillException instanceof IOException) { |
| throw (IOException) spillException; |
| } else { |
| throw new IOException(spillException); |
| } |
| } else { |
| LOG.info("All spills complete"); |
| // Assuming close will be called on the same thread as the write |
| cleanup(); |
| if (numSpills.get() > 0) { |
| mergeAll(); |
| } else { |
| finalSpill(); |
| } |
| |
| currentBuffer.cleanup(); |
| currentBuffer = null; |
| } |
| |
| return Collections.singletonList(generateEvent()); |
| } |
| |
| private void cleanup() { |
| if (spillExecutor != null) { |
| spillExecutor.shutdownNow(); |
| } |
| for (int i = 0; i < buffers.length; i++) { |
| if (buffers[i] != null && buffers[i] != currentBuffer) { |
| buffers[i].cleanup(); |
| buffers[i] = null; |
| } |
| } |
| availableBuffers.clear(); |
| } |
| |
| private Event generateEvent() throws IOException { |
| DataMovementEventPayloadProto.Builder payloadBuidler = DataMovementEventPayloadProto |
| .newBuilder(); |
| |
| String host = getHost(); |
| int shufflePort = getShufflePort(); |
| |
| BitSet emptyPartitions = new BitSet(); |
| for (int i = 0; i < numPartitions; i++) { |
| if (numRecordsPerPartition[i] == 0) { |
| emptyPartitions.set(i); |
| } |
| } |
| if (emptyPartitions.cardinality() != 0) { |
| // Empty partitions exist |
| ByteString emptyPartitionsByteString = TezCommonUtils.compressByteArrayToByteString(TezUtils |
| .toByteArray(emptyPartitions)); |
| payloadBuidler.setEmptyPartitions(emptyPartitionsByteString); |
| } |
| if (emptyPartitions.cardinality() != numPartitions) { |
| // Populate payload only if at least 1 partition has data |
| payloadBuidler.setHost(host); |
| payloadBuidler.setPort(shufflePort); |
| payloadBuidler.setPathComponent(outputContext.getUniqueIdentifier()); |
| } |
| |
| CompositeDataMovementEvent cDme = new CompositeDataMovementEvent(0, numPartitions, |
| payloadBuidler.build().toByteArray()); |
| return cDme; |
| } |
| |
| private void finalSpill() throws IOException { |
| if (currentBuffer.nextPosition == 0) { |
| return; |
| } else { |
| updateGlobalStats(currentBuffer); |
| SpillCallable spillCallable = new SpillCallable(currentBuffer, 0, codec, null, true); |
| spillCallable.call(); |
| return; |
| } |
| |
| } |
| |
| private void mergeAll() throws IOException { |
| long expectedSize = spilledSize; |
| if (currentBuffer.nextPosition != 0) { |
| expectedSize += currentBuffer.nextPosition - (currentBuffer.numRecords * META_SIZE) |
| - currentBuffer.skipSize + numPartitions * APPROX_HEADER_LENGTH; |
| // Update final statistics. |
| updateGlobalStats(currentBuffer); |
| } |
| |
| long indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH; |
| Path finalOutPath = outputFileHandler.getOutputFileForWrite(expectedSize); |
| Path finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate); |
| |
| TezSpillRecord finalSpillRecord = new TezSpillRecord(numPartitions); |
| |
| DataInputBuffer keyBuffer = new DataInputBuffer(); |
| DataInputBuffer valBuffer = new DataInputBuffer(); |
| |
| DataInputBuffer keyBufferIFile = new DataInputBuffer(); |
| DataInputBuffer valBufferIFile = new DataInputBuffer(); |
| |
| FSDataOutputStream out = null; |
| try { |
| out = rfs.create(finalOutPath); |
| Writer writer = null; |
| |
| for (int i = 0; i < numPartitions; i++) { |
| long segmentStart = out.getPos(); |
| if (numRecordsPerPartition[i] == 0) { |
| LOG.info("Skipping partition: " + i + " in final merge since it has no records"); |
| continue; |
| } |
| writer = new Writer(conf, out, keyClass, valClass, codec, null, null); |
| try { |
| if (currentBuffer.nextPosition != 0 |
| && currentBuffer.partitionPositions[i] != WrappedBuffer.PARTITION_ABSENT_POSITION) { |
| // Write current buffer. |
| writePartition(currentBuffer.partitionPositions[i], currentBuffer, writer, keyBuffer, |
| valBuffer); |
| } |
| synchronized (spillInfoList) { |
| for (SpillInfo spillInfo : spillInfoList) { |
| TezIndexRecord indexRecord = spillInfo.spillRecord.getIndex(i); |
| if (indexRecord.getPartLength() == 0) { |
| // Skip empty partitions within a spill |
| continue; |
| } |
| FSDataInputStream in = rfs.open(spillInfo.outPath); |
| in.seek(indexRecord.getStartOffset()); |
| IFile.Reader reader = new IFile.Reader(in, indexRecord.getPartLength(), codec, null, |
| additionalSpillBytesReadCounter, ifileReadAhead, ifileReadAheadLength, |
| ifileBufferSize); |
| while (reader.nextRawKey(keyBufferIFile)) { |
| // TODO Inefficient. If spills are not compressed, a direct copy should be possible |
| // given the current IFile format. Also exteremely inefficient for large records, |
| // since the entire record will be read into memory. |
| reader.nextRawValue(valBufferIFile); |
| writer.append(keyBufferIFile, valBufferIFile); |
| } |
| reader.close(); |
| } |
| } |
| writer.close(); |
| fileOutputBytesCounter.increment(writer.getCompressedLength()); |
| TezIndexRecord indexRecord = new TezIndexRecord(segmentStart, writer.getRawLength(), |
| writer.getCompressedLength()); |
| writer = null; |
| finalSpillRecord.putIndex(indexRecord, i); |
| } finally { |
| if (writer != null) { |
| writer.close(); |
| } |
| } |
| } |
| } finally { |
| if (out != null) { |
| out.close(); |
| } |
| } |
| finalSpillRecord.writeToFile(finalIndexPath, conf); |
| fileOutputBytesCounter.increment(indexFileSizeEstimate); |
| LOG.info("Finished final spill after merging : " + numSpills.get() + " spills"); |
| } |
| |
| private void writeLargeRecord(final Object key, final Object value, final int partition, |
| final int spillNumber) throws IOException { |
| numAdditionalSpillsCounter.increment(1); |
| long size = sizePerBuffer - (currentBuffer.numRecords * META_SIZE) - currentBuffer.skipSize |
| + numPartitions * APPROX_HEADER_LENGTH; |
| FSDataOutputStream out = null; |
| long outSize = 0; |
| try { |
| final TezSpillRecord spillRecord = new TezSpillRecord(numPartitions); |
| final Path outPath = outputFileHandler.getSpillFileForWrite(spillNumber, size); |
| out = rfs.create(outPath); |
| for (int i = 0; i < numPartitions; i++) { |
| final long recordStart = out.getPos(); |
| if (i == partition) { |
| spilledRecordsCounter.increment(1); |
| Writer writer = null; |
| try { |
| writer = new IFile.Writer(conf, out, keyClass, valClass, codec, null, null); |
| writer.append(key, value); |
| outputLargeRecordsCounter.increment(1); |
| numRecordsPerPartition[i]++; |
| writer.close(); |
| additionalSpillBytesWritternCounter.increment(writer.getCompressedLength()); |
| TezIndexRecord indexRecord = new TezIndexRecord(recordStart, writer.getRawLength(), |
| writer.getCompressedLength()); |
| spillRecord.putIndex(indexRecord, i); |
| outSize = writer.getCompressedLength(); |
| writer = null; |
| } finally { |
| if (writer != null) { |
| writer.close(); |
| } |
| } |
| } |
| } |
| SpillInfo spillInfo = new SpillInfo(spillRecord, outPath); |
| spillInfoList.add(spillInfo); |
| LOG.info("Finished writing large record of size " + outSize + " to spill file " + spillNumber); |
| } finally { |
| if (out != null) { |
| out.close(); |
| } |
| } |
| } |
| |
| private class ByteArrayOutputStream extends OutputStream { |
| |
| private final byte[] scratch = new byte[1]; |
| |
| @Override |
| public void write(int v) throws IOException { |
| scratch[0] = (byte) v; |
| write(scratch, 0, 1); |
| } |
| |
| public void write(byte[] b, int off, int len) throws IOException { |
| if (len > currentBuffer.availableSize) { |
| throw new BufferTooSmallException(); |
| } else { |
| System.arraycopy(b, off, currentBuffer.buffer, currentBuffer.nextPosition, len); |
| currentBuffer.nextPosition += len; |
| currentBuffer.availableSize -= len; |
| } |
| } |
| } |
| |
| private static class WrappedBuffer { |
| |
| private static final int PARTITION_ABSENT_POSITION = -1; |
| |
| private final int[] partitionPositions; |
| private final int[] recordsPerPartition; |
| private final int numPartitions; |
| private final int size; |
| |
| private byte[] buffer; |
| private IntBuffer metaBuffer; |
| |
| private int numRecords = 0; |
| private int skipSize = 0; |
| |
| private int nextPosition = 0; |
| private int availableSize; |
| |
| WrappedBuffer(int numPartitions, int size) { |
| this.partitionPositions = new int[numPartitions]; |
| this.recordsPerPartition = new int[numPartitions]; |
| this.numPartitions = numPartitions; |
| for (int i = 0; i < numPartitions; i++) { |
| this.partitionPositions[i] = PARTITION_ABSENT_POSITION; |
| this.recordsPerPartition[i] = 0; |
| } |
| size = size - (size % INT_SIZE); |
| this.size = size; |
| this.buffer = new byte[size]; |
| this.metaBuffer = ByteBuffer.wrap(buffer).order(ByteOrder.nativeOrder()).asIntBuffer(); |
| availableSize = size; |
| } |
| |
| void reset() { |
| for (int i = 0; i < numPartitions; i++) { |
| this.partitionPositions[i] = PARTITION_ABSENT_POSITION; |
| this.recordsPerPartition[i] = 0; |
| } |
| numRecords = 0; |
| nextPosition = 0; |
| skipSize = 0; |
| availableSize = size; |
| } |
| |
| void cleanup() { |
| buffer = null; |
| metaBuffer = null; |
| } |
| } |
| |
| private static class BufferTooSmallException extends IOException { |
| private static final long serialVersionUID = 1L; |
| } |
| |
| private class SpillCallback implements FutureCallback<SpillResult> { |
| |
| private final int spillNumber; |
| |
| SpillCallback(int spillNumber) { |
| this.spillNumber = spillNumber; |
| } |
| |
| @Override |
| public void onSuccess(SpillResult result) { |
| LOG.info("Spill# " + spillNumber + " complete."); |
| spilledSize += result.spillSize; |
| try { |
| result.wrappedBuffer.reset(); |
| availableBuffers.add(result.wrappedBuffer); |
| |
| } catch (Throwable e) { |
| LOG.fatal("Failure while attempting to reset buffer after spill", e); |
| outputContext.fatalError(e, "Failure while attempting to reset buffer after spill"); |
| } |
| |
| spillLock.lock(); |
| try { |
| if (pendingSpillCount.decrementAndGet() == 0) { |
| spillInProgress.signal(); |
| } |
| } finally { |
| spillLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void onFailure(Throwable t) { |
| // spillException setup to throw an exception back to the user. Requires synchronization. |
| // Consider removing it in favor of having Tez kill the task |
| LOG.fatal("Failure while spilling to disk", t); |
| spillException = t; |
| outputContext.fatalError(t, "Failure while spilling to disk"); |
| spillLock.lock(); |
| try { |
| spillInProgress.signal(); |
| } finally { |
| spillLock.unlock(); |
| } |
| } |
| } |
| |
| private static class SpillResult { |
| final long spillSize; |
| final WrappedBuffer wrappedBuffer; |
| |
| SpillResult(long size, WrappedBuffer wrappedBuffer) { |
| this.spillSize = size; |
| this.wrappedBuffer = wrappedBuffer; |
| } |
| } |
| |
| private static class SpillInfo { |
| final TezSpillRecord spillRecord; |
| final Path outPath; |
| |
| SpillInfo(TezSpillRecord spillRecord, Path outPath) { |
| this.spillRecord = spillRecord; |
| this.outPath = outPath; |
| } |
| } |
| |
| @VisibleForTesting |
| String getHost() { |
| return System.getenv(ApplicationConstants.Environment.NM_HOST.toString()); |
| } |
| |
| @VisibleForTesting |
| int getShufflePort() throws IOException { |
| ByteBuffer shuffleMetadata = outputContext |
| .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); |
| int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata); |
| return shufflePort; |
| } |
| } |