TEZ-4180: Show convenient input -> output vertex names in output/sort messages (#154)
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java
index 33fe772..f0de897 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java
@@ -33,21 +33,27 @@
* Output's data
* @return Name of the Destination Vertex
*/
- public String getDestinationVertexName();
-
+ String getDestinationVertexName();
+
+ /**
+ * Returns a convenient, human-readable string describing the input and output vertices.
+ * @return the convenient string
+ */
+ String getInputOutputVertexNames();
+
/**
* Get the index of the output in the set of all outputs for the task. The
* index will be consistent and valid only among the tasks of this vertex.
* @return index
*/
- public int getOutputIndex();
+ int getOutputIndex();
/**
* Get an {@link OutputStatisticsReporter} for this {@link Output} that can
* be used to report statistics like data size
* @return {@link OutputStatisticsReporter}
*/
- public OutputStatisticsReporter getStatisticsReporter();
+ OutputStatisticsReporter getStatisticsReporter();
/**
* Notify the context that at this point no more events should be sent.
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 19ece5a..9aeae25 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -458,7 +458,7 @@
initCommitter(jobConf, useNewApi);
}
- LOG.info(getContext().getDestinationVertexName() + ": "
+ LOG.info(getContext().getInputOutputVertexNames() + ": "
+ "outputFormat=" + outputFormatClassName
+ ", using newmapreduce API=" + useNewApi);
return null;
@@ -576,7 +576,7 @@
@Override
public synchronized List<Event> close() throws IOException {
flush();
- LOG.info(getContext().getDestinationVertexName() + " closed");
+ LOG.info(getContext().getInputOutputVertexNames() + " closed");
long outputRecords = getContext().getCounters()
.findCounter(TaskCounter.OUTPUT_RECORDS).getValue();
getContext().getStatisticsReporter().reportItemsProcessed(outputRecords);
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 20ec062..a17bc89 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -154,6 +154,12 @@
return destinationVertexName;
}
+
+ @Override
+ public String getInputOutputVertexNames() {
+ return String.format("%s -> %s", getTaskVertexName(), getDestinationVertexName());
+ }
+
@Override
public void fatalError(Throwable exception, String message) {
super.signalFatalError(exception, message, sourceInfo);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index 3ff74f7..758c069 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -183,7 +183,7 @@
rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw();
if (LOG.isDebugEnabled()) {
- LOG.debug(outputContext.getDestinationVertexName() + ": Initial Mem bytes : " +
+ LOG.debug(outputContext.getInputOutputVertexNames() + ": Initial Mem bytes : " +
initialMemoryAvailable + ", in MB=" + ((initialMemoryAvailable >> 20)));
}
int assignedMb = (int) (initialMemoryAvailable >> 20);
@@ -201,7 +201,7 @@
this.serializationContext = new SerializationContext(this.conf);
keySerializer = serializationContext.getKeySerializer();
valSerializer = serializationContext.getValueSerializer();
- LOG.info(outputContext.getDestinationVertexName() + " using: "
+ LOG.info(outputContext.getInputOutputVertexNames() + " using: "
+ "memoryMb=" + assignedMb
+ ", keySerializerClass=" + serializationContext.getKeyClass()
+ ", valueSerializerClass=" + valSerializer
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index b70d6c4..08786c9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -161,7 +161,7 @@
}
StringBuilder initialSetupLogLine = new StringBuilder("Setting up PipelinedSorter for ")
- .append(outputContext.getDestinationVertexName()).append(": ");
+ .append(outputContext.getInputOutputVertexNames()).append(": ");
partitionBits = bitcount(partitions)+1;
boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
@@ -235,10 +235,9 @@
TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS_DEFAULT);
sortmaster = Executors.newFixedThreadPool(sortThreads,
new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("Sorter {" + TezUtilsInternal
- .cleanVertexName(outputContext.getDestinationVertexName()) + "} #%d")
- .build());
-
+ .setNameFormat("Sorter {" + TezUtilsInternal.cleanVertexName(outputContext.getTaskVertexName()) + " -> "
+ + TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()) + "} #%d")
+ .build());
valSerializer.open(span.out);
keySerializer.open(span.out);
@@ -336,7 +335,8 @@
boolean ret = spill(true);
stopWatch.stop();
if (LOG.isDebugEnabled()) {
- LOG.debug(outputContext.getDestinationVertexName() + ": Time taken for spill " + (stopWatch.now(TimeUnit.MILLISECONDS)) + " ms");
+ LOG.debug(outputContext.getInputOutputVertexNames() + ": Time taken for spill "
+ + (stopWatch.now(TimeUnit.MILLISECONDS)) + " ms");
}
if (pipelinedShuffle && ret) {
sendPipelinedShuffleEvents();
@@ -380,7 +380,7 @@
partitions, sendEmptyPartitionDetails, pathComponent, partitionStats,
reportDetailedPartitionStats(), auxiliaryService, deflater);
outputContext.sendEvents(events);
- LOG.info(outputContext.getDestinationVertexName() +
+ LOG.info(outputContext.getInputOutputVertexNames() +
": Added spill event for spill (final update=false), spillId=" + (numSpills - 1));
}
@@ -496,7 +496,7 @@
ensureSpillFilePermissions(filename, rfs);
try {
- LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString() +
+ LOG.info(outputContext.getInputOutputVertexNames() + ": Spilling to " + filename.toString() +
", indexFilename=" + indexFilename);
for (int i = 0; i < partitions; ++i) {
if (isThreadInterrupted()) {
@@ -568,8 +568,9 @@
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- LOG.info(outputContext.getDestinationVertexName() + ": Interrupted while waiting for mergers to complete");
- throw new IOInterruptedException(outputContext.getDestinationVertexName() + ": Interrupted while waiting for mergers to complete", e);
+ LOG.info(outputContext.getInputOutputVertexNames() + ": Interrupted while waiting for mergers to complete");
+ throw new IOInterruptedException(
+ outputContext.getInputOutputVertexNames() + ": Interrupted while waiting for mergers to complete", e);
}
// create spill file
@@ -581,7 +582,7 @@
spillFilePaths.put(numSpills, filename);
out = rfs.create(filename, true, 4096);
ensureSpillFilePermissions(filename, rfs);
- LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString());
+ LOG.info(outputContext.getInputOutputVertexNames() + ": Spilling to " + filename.toString());
for (int i = 0; i < partitions; ++i) {
if (isThreadInterrupted()) {
return false;
@@ -652,8 +653,9 @@
cleanup();
}
sortmaster.shutdownNow();
- LOG.info(outputContext.getDestinationVertexName() + ": Thread interrupted, cleaned up stale data, sorter threads shutdown=" + sortmaster
- .isShutdown() + ", terminated=" + sortmaster.isTerminated());
+ LOG.info(outputContext.getInputOutputVertexNames()
+ + ": Thread interrupted, cleaned up stale data, sorter threads shutdown=" + sortmaster.isShutdown()
+ + ", terminated=" + sortmaster.isTerminated());
return true;
}
return false;
@@ -674,7 +676,7 @@
}
try {
- LOG.info(outputContext.getDestinationVertexName() + ": Starting flush of map output");
+ LOG.info(outputContext.getInputOutputVertexNames() + ": Starting flush of map output");
span.end();
merger.add(span.sort(sorter));
// force a spill in flush()
@@ -698,7 +700,7 @@
* NPE leading to distraction when debugging.
*/
if (LOG.isDebugEnabled()) {
- LOG.debug(outputContext.getDestinationVertexName()
+ LOG.debug(outputContext.getInputOutputVertexNames()
+ ": Index list is empty... returning");
}
return;
@@ -717,7 +719,8 @@
outputContext, i, indexCacheList.get(i), partitions,
sendEmptyPartitionDetails, pathComponent, partitionStats,
reportDetailedPartitionStats(), auxiliaryService, deflater);
- LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
+ LOG.info(outputContext.getInputOutputVertexNames() + ": Adding spill event for spill (final update="
+ + isLastEvent + "), spillId=" + i);
}
return;
}
@@ -736,7 +739,7 @@
sameVolRename(filename, finalOutputFile);
sameVolRename(indexFilename, finalIndexFile);
if (LOG.isDebugEnabled()) {
- LOG.debug(outputContext.getDestinationVertexName() + ": numSpills=" + numSpills +
+ LOG.debug(outputContext.getInputOutputVertexNames() + ": numSpills=" + numSpills +
", finalOutputFile=" + finalOutputFile + ", "
+ "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" +
indexFilename);
@@ -759,7 +762,7 @@
mapOutputFile.getOutputIndexFileForWrite(0); //TODO
if (LOG.isDebugEnabled()) {
- LOG.debug(outputContext.getDestinationVertexName() + ": " +
+ LOG.debug(outputContext.getInputOutputVertexNames() + ": " +
"numSpills: " + numSpills + ", finalOutputFile:" + finalOutputFile + ", finalIndexFile:"
+ finalIndexFile);
}
@@ -944,7 +947,7 @@
}
ByteBuffer reserved = source.duplicate();
reserved.mark();
- LOG.info(outputContext.getDestinationVertexName() + ": " + "reserved.remaining()=" +
+ LOG.info(outputContext.getInputOutputVertexNames() + ": " + "reserved.remaining()=" +
reserved.remaining() + ", reserved.metasize=" + metasize);
reserved.position(metasize);
kvbuffer = reserved.slice();
@@ -966,8 +969,8 @@
if(length() > 1) {
sorter.sort(this, 0, length(), progressable);
}
- LOG.info(outputContext.getDestinationVertexName() + ": " + "done sorting span=" + index + ", length=" + length() + ", "
- + "time=" + (System.currentTimeMillis() - start));
+ LOG.info(outputContext.getInputOutputVertexNames() + ": " + "done sorting span=" + index + ", length=" + length()
+ + ", " + "time=" + (System.currentTimeMillis() - start));
return new SpanIterator((SortSpan)this);
}
@@ -1042,8 +1045,9 @@
}
newSpan = new SortSpan(remaining, items, perItem, newComparator);
newSpan.index = index+1;
- LOG.info(String.format(outputContext.getDestinationVertexName() + ": " + "New Span%d.length = %d, perItem = %d", newSpan.index, newSpan
- .length(), perItem) + ", counter:" + mapOutputRecordCounter.getValue());
+ LOG.info(
+ String.format(outputContext.getInputOutputVertexNames() + ": " + "New Span%d.length = %d, perItem = %d",
+ newSpan.index, newSpan.length(), perItem) + ", counter:" + mapOutputRecordCounter.getValue());
return newSpan;
}
return null;
@@ -1064,13 +1068,14 @@
return null;
}
int perItem = kvbuffer.position()/items;
- LOG.info(outputContext.getDestinationVertexName() + ": " + String.format("Span%d.length = %d, perItem = %d", index, length(), perItem));
+ LOG.info(outputContext.getInputOutputVertexNames() + ": "
+ + String.format("Span%d.length = %d, perItem = %d", index, length(), perItem));
if(remaining.remaining() < METASIZE+perItem) {
//Check if we can get the next Buffer from the main buffer list
ByteBuffer space = allocateSpace();
if (space != null) {
- LOG.info(outputContext.getDestinationVertexName() + ": " + "Getting memory from next block in the list, recordsWritten=" +
- mapOutputRecordCounter.getValue());
+ LOG.info(outputContext.getInputOutputVertexNames() + ": "
+ + "Getting memory from next block in the list, recordsWritten=" + mapOutputRecordCounter.getValue());
reinit = true;
return space;
}
@@ -1403,7 +1408,7 @@
total += sp.span.length();
eq += sp.span.getEq();
}
- LOG.info(outputContext.getDestinationVertexName() + ": " + "Heap = " + sb.toString());
+ LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Heap = " + sb.toString());
return true;
} catch(ExecutionException e) {
LOG.error("Heap size={}, total={}, eq={}, partition={}, gallop={}, totalItr={},"
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index dd6c083..7c67874 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -153,7 +153,7 @@
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
if (confPipelinedShuffle) {
- LOG.warn(outputContext.getDestinationVertexName() + ": " +
+ LOG.warn(outputContext.getInputOutputVertexNames() + ": " +
TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " does not work "
+ "with DefaultSorter. It is supported only with PipelinedSorter.");
}
@@ -371,7 +371,8 @@
kvindex = (int)(((long)kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity());
totalKeys++;
} catch (MapBufferTooSmallException e) {
- LOG.info(outputContext.getDestinationVertexName() + ": Record too large for in-memory buffer: " + e.getMessage());
+ LOG.info(
+ outputContext.getInputOutputVertexNames() + ": Record too large for in-memory buffer: " + e.getMessage());
spillSingleRecord(key, value, partition);
mapOutputRecordCounter.increment(1);
return;
@@ -390,7 +391,7 @@
// Cast one of the operands to long to avoid integer overflow
kvindex = (int) (((long) aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
if (LOG.isInfoEnabled()) {
- LOG.info(outputContext.getDestinationVertexName() + ": " + "(EQUATOR) " + pos + " kvi " + kvindex +
+ LOG.info(outputContext.getInputOutputVertexNames() + ": " + "(EQUATOR) " + pos + " kvi " + kvindex +
"(" + (kvindex * 4) + ")");
}
}
@@ -408,7 +409,7 @@
// Cast one of the operands to long to avoid integer overflow
kvstart = kvend = (int) (((long) aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
if (LOG.isInfoEnabled()) {
- LOG.info(outputContext.getDestinationVertexName() + ": " + "(RESET) equator " + e + " kv " + kvstart + "(" +
+ LOG.info(outputContext.getInputOutputVertexNames() + ": " + "(RESET) equator " + e + " kv " + kvstart + "(" +
(kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
}
}
@@ -664,7 +665,7 @@
spillThread.interrupt();
spillThread.join();
} catch (InterruptedException e) {
- LOG.info(outputContext.getDestinationVertexName() + ": " + "Spill thread interrupted");
+ LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Spill thread interrupted");
//Reset status
Thread.currentThread().interrupt();
throw new IOInterruptedException("Spill failed", e);
@@ -673,7 +674,7 @@
@Override
public void flush() throws IOException {
- LOG.info(outputContext.getDestinationVertexName() + ": " + "Starting flush of map output");
+ LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Starting flush of map output");
outputContext.notifyProgress();
if (Thread.currentThread().isInterrupted()) {
/**
@@ -710,7 +711,7 @@
bufend = bufmark;
if (LOG.isInfoEnabled()) {
LOG.info(
- outputContext.getDestinationVertexName() + ": " + "Sorting & Spilling map output. "
+ outputContext.getInputOutputVertexNames() + ": " + "Sorting & Spilling map output. "
+ "bufstart = " + bufstart + ", bufend = " + bufmark + ", bufvoid = " + bufvoid
+ "; " + "kvstart=" + kvstart + "(" + (kvstart * 4) + ")"
+ ", kvend = " + kvend + "(" + (kvend * 4) + ")"
@@ -781,7 +782,7 @@
spillLock.unlock();
sortAndSpill(sameKeyCount, totalKeysCount);
} catch (Throwable t) {
- LOG.warn(outputContext.getDestinationVertexName() + ": " + "Got an exception in sortAndSpill", t);
+ LOG.warn(outputContext.getInputOutputVertexNames() + ": " + "Got an exception in sortAndSpill", t);
sortSpillException = t;
} finally {
spillLock.lock();
@@ -794,7 +795,7 @@
}
}
} catch (InterruptedException e) {
- LOG.info(outputContext.getDestinationVertexName() + ": " + "Spill thread interrupted");
+ LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Spill thread interrupted");
Thread.currentThread().interrupt();
} finally {
spillLock.unlock();
@@ -830,7 +831,7 @@
bufend = bufmark;
spillInProgress = true;
if (LOG.isInfoEnabled()) {
- LOG.info(outputContext.getDestinationVertexName() + ": Spilling map output."
+ LOG.info(outputContext.getInputOutputVertexNames() + ": Spilling map output."
+ "bufstart=" + bufstart + ", bufend = " + bufmark + ", bufvoid = " + bufvoid
+"; kvstart=" + kvstart + "(" + (kvstart * 4) + ")"
+", kvend = " + kvend + "(" + (kvend * 4) + ")"
@@ -936,7 +937,7 @@
TezRawKeyValueIterator kvIter =
new MRResultIterator(spstart, spindex);
if (LOG.isDebugEnabled()) {
- LOG.debug(outputContext.getDestinationVertexName() + ": " + "Running combine processor");
+ LOG.debug(outputContext.getInputOutputVertexNames() + ": " + "Running combine processor");
}
runCombineProcessor(kvIter, writer);
}
@@ -975,7 +976,7 @@
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
- LOG.info(outputContext.getDestinationVertexName() + ": " + "Finished spill " + numSpills
+ LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Finished spill " + numSpills
+ " at " + filename.toString());
++numSpills;
if (!isFinalMergeEnabled()) {
@@ -1172,7 +1173,7 @@
outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent,
partitionStats, reportDetailedPartitionStats(), auxiliaryService, deflater);
- LOG.info(outputContext.getDestinationVertexName() + ": " +
+ LOG.info(outputContext.getInputOutputVertexNames() + ": " +
"Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index);
if (sendEvent) {
@@ -1339,7 +1340,7 @@
segmentList.add(s);
}
if (LOG.isDebugEnabled()) {
- LOG.debug(outputContext.getDestinationVertexName() + ": "
+ LOG.debug(outputContext.getInputOutputVertexNames() + ": "
+ "TaskIdentifier=" + taskIdentifier + " Partition=" + parts +
"Spill =" + i + "(" + indexRecord.getStartOffset() + "," +
indexRecord.getRawLength() + ", " +
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 5ff2944..faf7586 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -104,7 +104,7 @@
// Maybe setup a separate statistics class which can be shared between the
// buffer and the main path instead of having multiple arrays.
- private final String destNameTrimmed;
+ private final String sourceDestNameTrimmed;
private final long availableMemory;
@VisibleForTesting
final WrappedBuffer[] buffers;
@@ -206,7 +206,8 @@
Preconditions.checkArgument(availableMemoryBytes >= 0, "availableMemory should be >= 0 bytes");
- this.destNameTrimmed = TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName());
+ this.sourceDestNameTrimmed = TezUtilsInternal.cleanVertexName(outputContext.getTaskVertexName()) + " -> "
+ + TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName());
//Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might not add much value in
// this case. Add it later if needed.
boolean pipelinedShuffleConf = this.conf.getBoolean(TezRuntimeConfiguration
@@ -257,7 +258,7 @@
buffers[0] = new WrappedBuffer(numOutputs, sizePerBuffer);
numInitializedBuffers = 1;
if (LOG.isDebugEnabled()) {
- LOG.debug(destNameTrimmed + ": " + "Initializing Buffer #" +
+ LOG.debug(sourceDestNameTrimmed + ": " + "Initializing Buffer #" +
numInitializedBuffers + " with size=" + sizePerBuffer);
}
currentBuffer = buffers[0];
@@ -313,7 +314,7 @@
skipBuffers = false;
writer = null;
}
- LOG.info(destNameTrimmed + ": "
+ LOG.info(sourceDestNameTrimmed + ": "
+ "numBuffers=" + numBuffers
+ ", sizePerBuffer=" + sizePerBuffer
+ ", skipBuffers=" + skipBuffers
@@ -493,7 +494,7 @@
// Update overall stats
final int filledBufferCount = filledBuffers.size();
if (LOG.isDebugEnabled() || (filledBufferCount % 10) == 0) {
- LOG.info(destNameTrimmed + ": " + "Moving to next buffer. Total filled buffers: " + filledBufferCount);
+ LOG.info(sourceDestNameTrimmed + ": " + "Moving to next buffer. Total filled buffers: " + filledBufferCount);
}
updateGlobalStats(currentBuffer);
@@ -531,7 +532,7 @@
final int filledBufferCount = filledBuffers.size();
if (LOG.isDebugEnabled() || (filledBufferCount % 10) == 0) {
- LOG.info(destNameTrimmed + ": triggering spill. filledBuffers.size=" + filledBufferCount);
+ LOG.info(sourceDestNameTrimmed + ": triggering spill. filledBuffers.size=" + filledBufferCount);
}
pendingSpillCount.incrementAndGet();
int spillNumber = numSpills.getAndIncrement();
@@ -673,10 +674,10 @@
spillResult = new SpillResult(compressedLength, this.filledBuffers);
handleSpillIndex(spillPathDetails, spillRecord);
- LOG.info(destNameTrimmed + ": " + "Finished spill " + spillIndex);
+ LOG.info(sourceDestNameTrimmed + ": " + "Finished spill " + spillIndex);
if (LOG.isDebugEnabled()) {
- LOG.debug(destNameTrimmed + ": " + "Spill=" + spillIndex + ", indexPath="
+ LOG.debug(sourceDestNameTrimmed + ": " + "Spill=" + spillIndex + ", indexPath="
+ spillPathDetails.indexFilePath + ", outputPath=" + spillPathDetails.outputFilePath);
}
return spillResult;
@@ -754,7 +755,8 @@
isShutdown.set(true);
spillLock.lock();
try {
- LOG.info(destNameTrimmed + ": " + "Waiting for all spills to complete : Pending : " + pendingSpillCount.get());
+ LOG.info(
+ sourceDestNameTrimmed + ": " + "Waiting for all spills to complete : Pending : " + pendingSpillCount.get());
while (pendingSpillCount.get() != 0 && spillException == null) {
spillInProgress.await();
}
@@ -762,7 +764,7 @@
spillLock.unlock();
}
if (spillException != null) {
- LOG.error(destNameTrimmed + ": " + "Error during spill, throwing");
+ LOG.error(sourceDestNameTrimmed + ": " + "Error during spill, throwing");
// Assuming close will be called on the same thread as the write
cleanup();
currentBuffer.cleanup();
@@ -773,7 +775,7 @@
throw new IOException(spillException);
}
} else {
- LOG.info(destNameTrimmed + ": " + "All spills complete");
+ LOG.info(sourceDestNameTrimmed + ": " + "All spills complete");
// Assuming close will be called on the same thread as the write
cleanup();
@@ -1082,7 +1084,8 @@
for (int i = 0; i < numPartitions; i++) {
long segmentStart = out.getPos();
if (numRecordsPerPartition[i] == 0) {
- LOG.info(destNameTrimmed + ": " + "Skipping partition: " + i + " in final merge since it has no records");
+ LOG.info(
+ sourceDestNameTrimmed + ": " + "Skipping partition: " + i + " in final merge since it has no records");
continue;
}
writer = new Writer(keySerialization, valSerialization, out, keyClass, valClass, codec, null, null);
@@ -1136,7 +1139,7 @@
}
finalSpillRecord.writeToFile(finalIndexPath, conf, localFs);
fileOutputBytesCounter.increment(indexFileSizeEstimate);
- LOG.info(destNameTrimmed + ": " + "Finished final spill after merging : " + numSpills.get() + " spills");
+ LOG.info(sourceDestNameTrimmed + ": " + "Finished final spill after merging : " + numSpills.get() + " spills");
}
private void deleteIntermediateSpills() {
@@ -1208,9 +1211,10 @@
mayBeSendEventsForSpill(emptyPartitions, sizePerPartition,
spillIndex, false);
- LOG.info(destNameTrimmed + ": " + "Finished writing large record of size " + outSize + " to spill file " + spillIndex);
+ LOG.info(sourceDestNameTrimmed + ": " + "Finished writing large record of size " + outSize + " to spill file "
+ + spillIndex);
if (LOG.isDebugEnabled()) {
- LOG.debug(destNameTrimmed + ": " + "LargeRecord Spill=" + spillIndex + ", indexPath="
+ LOG.debug(sourceDestNameTrimmed + ": " + "LargeRecord Spill=" + spillIndex + ", indexPath="
+ spillPathDetails.indexFilePath + ", outputPath="
+ spillPathDetails.outputFilePath);
}
@@ -1346,7 +1350,7 @@
try {
events = generateEventForSpill(emptyPartitions, sizePerPartition, spillNumber,
isFinalUpdate);
- LOG.info(destNameTrimmed + ": " + "Adding spill event for spill"
+ LOG.info(sourceDestNameTrimmed + ": " + "Adding spill event for spill"
+ " (final update=" + isFinalUpdate + "), spillId=" + spillNumber);
if (pipelinedShuffle) {
//Send out an event for consuming.
@@ -1355,7 +1359,7 @@
this.finalEvents.addAll(events);
}
} catch (IOException e) {
- LOG.error(destNameTrimmed + ": " + "Error in sending pipelined events", e);
+ LOG.error(sourceDestNameTrimmed + ": " + "Error in sending pipelined events", e);
outputContext.reportFailure(TaskFailureType.NON_FATAL, e,
"Error in sending events.");
}
@@ -1414,7 +1418,7 @@
availableBuffers.add(buffer);
}
} catch (Throwable e) {
- LOG.error(destNameTrimmed + ": Failure while attempting to reset buffer after spill", e);
+ LOG.error(sourceDestNameTrimmed + ": Failure while attempting to reset buffer after spill", e);
outputContext.reportFailure(TaskFailureType.NON_FATAL, e, "Failure while attempting to reset buffer after spill");
}
@@ -1444,7 +1448,7 @@
public void onFailure(Throwable t) {
// spillException setup to throw an exception back to the user. Requires synchronization.
// Consider removing it in favor of having Tez kill the task
- LOG.error(destNameTrimmed + ": " + "Failure while spilling to disk", t);
+ LOG.error(sourceDestNameTrimmed + ": " + "Failure while spilling to disk", t);
spillException = t;
outputContext.reportFailure(TaskFailureType.NON_FATAL, t, "Failure while spilling to disk");
spillLock.lock();
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 676fe17..44cb9d6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -135,7 +135,7 @@
if (pipelinedShuffle) {
if (finalMergeEnabled) {
- LOG.info(getContext().getDestinationVertexName() + " disabling final merge as "
+ LOG.info(getContext().getInputOutputVertexNames() + " disabling final merge as "
+ TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " is enabled.");
finalMergeEnabled = false;
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
@@ -194,7 +194,7 @@
returnEvents.addAll(generateEvents());
sorter = null;
} else {
- LOG.warn(getContext().getDestinationVertexName() +
+ LOG.warn(getContext().getInputOutputVertexNames() +
": Attempting to close output {} of type {} before it was started. Generating empty events",
getContext().getDestinationVertexName(), this.getClass().getSimpleName());
returnEvents = generateEmptyEvents();
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index e7a4429..bcacc52 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -106,7 +106,7 @@
this.kvWriter = new UnorderedPartitionedKVWriter(getContext(), conf, 1,
memoryUpdateCallbackHandler.getMemoryAssigned());
isStarted.set(true);
- LOG.info(getContext().getDestinationVertexName() + " started. MemoryAssigned="
+ LOG.info(getContext().getInputOutputVertexNames() + " started. MemoryAssigned="
+ memoryUpdateCallbackHandler.getMemoryAssigned());
}
}
@@ -130,7 +130,7 @@
returnEvents = kvWriter.close();
kvWriter = null;
} else {
- LOG.warn(getContext().getDestinationVertexName() +
+ LOG.warn(getContext().getInputOutputVertexNames() +
": Attempting to close output {} of type {} before it was started. Generating empty events",
getContext().getDestinationVertexName(), this.getClass().getSimpleName());
returnEvents = new LinkedList<Event>();
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 439b732..9bc7ea4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -108,7 +108,7 @@
returnEvents = kvWriter.close();
kvWriter = null;
} else {
- LOG.warn(getContext().getDestinationVertexName() +
+ LOG.warn(getContext().getInputOutputVertexNames() +
": Attempting to close output {} of type {} before it was started. Generating empty events",
getContext().getDestinationVertexName(), this.getClass().getSimpleName());
returnEvents = new LinkedList<Event>();
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
index b81c2bd..a7c7ca2 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
@@ -69,6 +69,7 @@
}).when(ctx).requestInitialMemory(anyLong(), any(MemoryUpdateCallback.class));
doReturn(conf).when(ctx).getContainerConfiguration();
doReturn(TezUtils.createUserPayloadFromConf(userPayloadConf)).when(ctx).getUserPayload();
+ doReturn("taskVertex").when(ctx).getTaskVertexName();
doReturn("destinationVertex").when(ctx).getDestinationVertexName();
doReturn("UUID").when(ctx).getUniqueIdentifier();
doReturn(new String[] { workingDir.toString() }).when(ctx).getWorkDirs();
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
index 2c9c3b2..7999d45 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -406,6 +406,7 @@
doReturn(payLoad).when(context).getUserPayload();
doReturn(5 * 1024 * 1024l).when(context).getTotalMemoryAvailableToTask();
doReturn(UniqueID).when(context).getUniqueIdentifier();
+ doReturn("v0").when(context).getTaskVertexName();
doReturn("v1").when(context).getDestinationVertexName();
doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(context)
.getServiceProviderMetaData