TEZ-4180: Show convenient input -> output vertex names in output/sort messages (#154)

diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java
index 33fe772..f0de897 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java
@@ -33,21 +33,27 @@
    * Output's data
    * @return Name of the Destination Vertex
    */
-  public String getDestinationVertexName();
-  
+  String getDestinationVertexName();
+
+  /**
+   * Returns a convenient, human-readable string describing the input and output vertices.
+   * @return the convenient string
+   */
+  String getInputOutputVertexNames();
+
   /**
    * Get the index of the output in the set of all outputs for the task. The 
    * index will be consistent and valid only among the tasks of this vertex.
    * @return index
    */
-  public int getOutputIndex();
+  int getOutputIndex();
   
   /**
    * Get an {@link OutputStatisticsReporter} for this {@link Output} that can
    * be used to report statistics like data size
    * @return {@link OutputStatisticsReporter}
    */
-  public OutputStatisticsReporter getStatisticsReporter();
+  OutputStatisticsReporter getStatisticsReporter();
 
   /**
    * Notify the context that at this point no more events should be sent.
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 19ece5a..9aeae25 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -458,7 +458,7 @@
       initCommitter(jobConf, useNewApi);
     }
 
-    LOG.info(getContext().getDestinationVertexName() + ": "
+    LOG.info(getContext().getInputOutputVertexNames() + ": "
         + "outputFormat=" + outputFormatClassName
         + ", using newmapreduce API=" + useNewApi);
     return null;
@@ -576,7 +576,7 @@
   @Override
   public synchronized List<Event> close() throws IOException {
     flush();
-    LOG.info(getContext().getDestinationVertexName() + " closed");
+    LOG.info(getContext().getInputOutputVertexNames() + " closed");
     long outputRecords = getContext().getCounters()
         .findCounter(TaskCounter.OUTPUT_RECORDS).getValue();
     getContext().getStatisticsReporter().reportItemsProcessed(outputRecords);
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 20ec062..a17bc89 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -154,6 +154,12 @@
     return destinationVertexName;
   }
 
+
+  @Override
+  public String getInputOutputVertexNames() {
+    return String.format("%s -> %s", getTaskVertexName(), getDestinationVertexName());
+  }
+
   @Override
   public void fatalError(Throwable exception, String message) {
     super.signalFatalError(exception, message, sourceInfo);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index 3ff74f7..758c069 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -183,7 +183,7 @@
     rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw();
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug(outputContext.getDestinationVertexName() + ": Initial Mem bytes : " +
+      LOG.debug(outputContext.getInputOutputVertexNames() + ": Initial Mem bytes : " +
           initialMemoryAvailable + ", in MB=" + ((initialMemoryAvailable >> 20)));
     }
     int assignedMb = (int) (initialMemoryAvailable >> 20);
@@ -201,7 +201,7 @@
     this.serializationContext = new SerializationContext(this.conf);
     keySerializer = serializationContext.getKeySerializer();
     valSerializer = serializationContext.getValueSerializer();
-    LOG.info(outputContext.getDestinationVertexName() + " using: "
+    LOG.info(outputContext.getInputOutputVertexNames() + " using: "
         + "memoryMb=" + assignedMb
         + ", keySerializerClass=" + serializationContext.getKeyClass()
         + ", valueSerializerClass=" + valSerializer
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index b70d6c4..08786c9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -161,7 +161,7 @@
     }
 
     StringBuilder initialSetupLogLine = new StringBuilder("Setting up PipelinedSorter for ")
-        .append(outputContext.getDestinationVertexName()).append(": ");
+        .append(outputContext.getInputOutputVertexNames()).append(": ");
     partitionBits = bitcount(partitions)+1;
 
     boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
@@ -235,10 +235,9 @@
                 TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS_DEFAULT);
     sortmaster = Executors.newFixedThreadPool(sortThreads,
         new ThreadFactoryBuilder().setDaemon(true)
-        .setNameFormat("Sorter {" + TezUtilsInternal
-            .cleanVertexName(outputContext.getDestinationVertexName()) + "} #%d")
-        .build());
-
+            .setNameFormat("Sorter {" + TezUtilsInternal.cleanVertexName(outputContext.getTaskVertexName()) + " -> "
+                + TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()) + "} #%d")
+            .build());
 
     valSerializer.open(span.out);
     keySerializer.open(span.out);
@@ -336,7 +335,8 @@
       boolean ret = spill(true);
       stopWatch.stop();
       if (LOG.isDebugEnabled()) {
-        LOG.debug(outputContext.getDestinationVertexName() + ": Time taken for spill " + (stopWatch.now(TimeUnit.MILLISECONDS)) + " ms");
+        LOG.debug(outputContext.getInputOutputVertexNames() + ": Time taken for spill "
+            + (stopWatch.now(TimeUnit.MILLISECONDS)) + " ms");
       }
       if (pipelinedShuffle && ret) {
         sendPipelinedShuffleEvents();
@@ -380,7 +380,7 @@
         partitions, sendEmptyPartitionDetails, pathComponent, partitionStats,
         reportDetailedPartitionStats(), auxiliaryService, deflater);
     outputContext.sendEvents(events);
-    LOG.info(outputContext.getDestinationVertexName() +
+    LOG.info(outputContext.getInputOutputVertexNames() +
         ": Added spill event for spill (final update=false), spillId=" + (numSpills - 1));
   }
 
@@ -496,7 +496,7 @@
     ensureSpillFilePermissions(filename, rfs);
 
     try {
-      LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString() +
+      LOG.info(outputContext.getInputOutputVertexNames() + ": Spilling to " + filename.toString() +
           ", indexFilename=" + indexFilename);
       for (int i = 0; i < partitions; ++i) {
         if (isThreadInterrupted()) {
@@ -568,8 +568,9 @@
         }
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        LOG.info(outputContext.getDestinationVertexName() + ": Interrupted while waiting for mergers to complete");
-        throw new IOInterruptedException(outputContext.getDestinationVertexName() + ": Interrupted while waiting for mergers to complete", e);
+        LOG.info(outputContext.getInputOutputVertexNames() + ": Interrupted while waiting for mergers to complete");
+        throw new IOInterruptedException(
+            outputContext.getInputOutputVertexNames() + ": Interrupted while waiting for mergers to complete", e);
       }
 
       // create spill file
@@ -581,7 +582,7 @@
       spillFilePaths.put(numSpills, filename);
       out = rfs.create(filename, true, 4096);
       ensureSpillFilePermissions(filename, rfs);
-      LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString());
+      LOG.info(outputContext.getInputOutputVertexNames() + ": Spilling to " + filename.toString());
       for (int i = 0; i < partitions; ++i) {
         if (isThreadInterrupted()) {
           return false;
@@ -652,8 +653,9 @@
         cleanup();
       }
       sortmaster.shutdownNow();
-      LOG.info(outputContext.getDestinationVertexName() + ": Thread interrupted, cleaned up stale data, sorter threads shutdown=" + sortmaster
-          .isShutdown() + ", terminated=" + sortmaster.isTerminated());
+      LOG.info(outputContext.getInputOutputVertexNames()
+          + ": Thread interrupted, cleaned up stale data, sorter threads shutdown=" + sortmaster.isShutdown()
+          + ", terminated=" + sortmaster.isTerminated());
       return true;
     }
     return false;
@@ -674,7 +676,7 @@
     }
 
     try {
-      LOG.info(outputContext.getDestinationVertexName() + ": Starting flush of map output");
+      LOG.info(outputContext.getInputOutputVertexNames() + ": Starting flush of map output");
       span.end();
       merger.add(span.sort(sorter));
       // force a spill in flush()
@@ -698,7 +700,7 @@
          * NPE leading to distraction when debugging.
          */
         if (LOG.isDebugEnabled()) {
-          LOG.debug(outputContext.getDestinationVertexName()
+          LOG.debug(outputContext.getInputOutputVertexNames()
               + ": Index list is empty... returning");
         }
         return;
@@ -717,7 +719,8 @@
               outputContext, i, indexCacheList.get(i), partitions,
               sendEmptyPartitionDetails, pathComponent, partitionStats,
               reportDetailedPartitionStats(), auxiliaryService, deflater);
-          LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
+          LOG.info(outputContext.getInputOutputVertexNames() + ": Adding spill event for spill (final update="
+              + isLastEvent + "), spillId=" + i);
         }
         return;
       }
@@ -736,7 +739,7 @@
         sameVolRename(filename, finalOutputFile);
         sameVolRename(indexFilename, finalIndexFile);
         if (LOG.isDebugEnabled()) {
-          LOG.debug(outputContext.getDestinationVertexName() + ": numSpills=" + numSpills +
+          LOG.debug(outputContext.getInputOutputVertexNames() + ": numSpills=" + numSpills +
               ", finalOutputFile=" + finalOutputFile + ", "
               + "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" +
               indexFilename);
@@ -759,7 +762,7 @@
           mapOutputFile.getOutputIndexFileForWrite(0); //TODO
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug(outputContext.getDestinationVertexName() + ": " +
+        LOG.debug(outputContext.getInputOutputVertexNames() + ": " +
             "numSpills: " + numSpills + ", finalOutputFile:" + finalOutputFile + ", finalIndexFile:"
                 + finalIndexFile);
       }
@@ -944,7 +947,7 @@
       }
       ByteBuffer reserved = source.duplicate();
       reserved.mark();
-      LOG.info(outputContext.getDestinationVertexName() + ": " + "reserved.remaining()=" +
+      LOG.info(outputContext.getInputOutputVertexNames() + ": " + "reserved.remaining()=" +
           reserved.remaining() + ", reserved.metasize=" + metasize);
       reserved.position(metasize);
       kvbuffer = reserved.slice();
@@ -966,8 +969,8 @@
       if(length() > 1) {
         sorter.sort(this, 0, length(), progressable);
       }
-      LOG.info(outputContext.getDestinationVertexName() + ": " + "done sorting span=" + index + ", length=" + length() + ", "
-          + "time=" + (System.currentTimeMillis() - start));
+      LOG.info(outputContext.getInputOutputVertexNames() + ": " + "done sorting span=" + index + ", length=" + length()
+          + ", " + "time=" + (System.currentTimeMillis() - start));
       return new SpanIterator((SortSpan)this);
     }
 
@@ -1042,8 +1045,9 @@
         }
         newSpan = new SortSpan(remaining, items, perItem, newComparator);
         newSpan.index = index+1;
-        LOG.info(String.format(outputContext.getDestinationVertexName() + ": " + "New Span%d.length = %d, perItem = %d", newSpan.index, newSpan
-            .length(), perItem) + ", counter:" + mapOutputRecordCounter.getValue());
+        LOG.info(
+            String.format(outputContext.getInputOutputVertexNames() + ": " + "New Span%d.length = %d, perItem = %d",
+                newSpan.index, newSpan.length(), perItem) + ", counter:" + mapOutputRecordCounter.getValue());
         return newSpan;
       }
       return null;
@@ -1064,13 +1068,14 @@
         return null;
       }
       int perItem = kvbuffer.position()/items;
-      LOG.info(outputContext.getDestinationVertexName() + ": " + String.format("Span%d.length = %d, perItem = %d", index, length(), perItem));
+      LOG.info(outputContext.getInputOutputVertexNames() + ": "
+          + String.format("Span%d.length = %d, perItem = %d", index, length(), perItem));
       if(remaining.remaining() < METASIZE+perItem) {
         //Check if we can get the next Buffer from the main buffer list
         ByteBuffer space = allocateSpace();
         if (space != null) {
-          LOG.info(outputContext.getDestinationVertexName() + ": " + "Getting memory from next block in the list, recordsWritten=" +
-              mapOutputRecordCounter.getValue());
+          LOG.info(outputContext.getInputOutputVertexNames() + ": "
+              + "Getting memory from next block in the list, recordsWritten=" + mapOutputRecordCounter.getValue());
           reinit = true;
           return space;
         }
@@ -1403,7 +1408,7 @@
             total += sp.span.length();
             eq += sp.span.getEq();
         }
-        LOG.info(outputContext.getDestinationVertexName() + ": " + "Heap = " + sb.toString());
+        LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Heap = " + sb.toString());
         return true;
       } catch(ExecutionException e) {
         LOG.error("Heap size={}, total={}, eq={}, partition={}, gallop={}, totalItr={},"
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index dd6c083..7c67874 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -153,7 +153,7 @@
         .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
 
     if (confPipelinedShuffle) {
-      LOG.warn(outputContext.getDestinationVertexName() + ": " +
+      LOG.warn(outputContext.getInputOutputVertexNames() + ": " +
           TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " does not work "
           + "with DefaultSorter. It is supported only with PipelinedSorter.");
     }
@@ -371,7 +371,8 @@
       kvindex = (int)(((long)kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity());
       totalKeys++;
     } catch (MapBufferTooSmallException e) {
-      LOG.info(outputContext.getDestinationVertexName() + ": Record too large for in-memory buffer: " + e.getMessage());
+      LOG.info(
+          outputContext.getInputOutputVertexNames() + ": Record too large for in-memory buffer: " + e.getMessage());
       spillSingleRecord(key, value, partition);
       mapOutputRecordCounter.increment(1);
       return;
@@ -390,7 +391,7 @@
     // Cast one of the operands to long to avoid integer overflow
     kvindex = (int) (((long) aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
     if (LOG.isInfoEnabled()) {
-      LOG.info(outputContext.getDestinationVertexName() + ": " + "(EQUATOR) " + pos + " kvi " + kvindex +
+      LOG.info(outputContext.getInputOutputVertexNames() + ": " + "(EQUATOR) " + pos + " kvi " + kvindex +
           "(" + (kvindex * 4) + ")");
     }
   }
@@ -408,7 +409,7 @@
     // Cast one of the operands to long to avoid integer overflow
     kvstart = kvend = (int) (((long) aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
     if (LOG.isInfoEnabled()) {
-      LOG.info(outputContext.getDestinationVertexName() + ": " + "(RESET) equator " + e + " kv " + kvstart + "(" +
+      LOG.info(outputContext.getInputOutputVertexNames() + ": " + "(RESET) equator " + e + " kv " + kvstart + "(" +
         (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
     }
   }
@@ -664,7 +665,7 @@
       spillThread.interrupt();
       spillThread.join();
     } catch (InterruptedException e) {
-      LOG.info(outputContext.getDestinationVertexName() + ": " + "Spill thread interrupted");
+      LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Spill thread interrupted");
       //Reset status
       Thread.currentThread().interrupt();
       throw new IOInterruptedException("Spill failed", e);
@@ -673,7 +674,7 @@
 
   @Override
   public void flush() throws IOException {
-    LOG.info(outputContext.getDestinationVertexName() + ": " + "Starting flush of map output");
+    LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Starting flush of map output");
     outputContext.notifyProgress();
     if (Thread.currentThread().isInterrupted()) {
       /**
@@ -710,7 +711,7 @@
         bufend = bufmark;
         if (LOG.isInfoEnabled()) {
           LOG.info(
-              outputContext.getDestinationVertexName() + ": " + "Sorting & Spilling map output. "
+              outputContext.getInputOutputVertexNames() + ": " + "Sorting & Spilling map output. "
                   + "bufstart = " + bufstart + ", bufend = " + bufmark + ", bufvoid = " + bufvoid
                   + "; " + "kvstart=" + kvstart + "(" + (kvstart * 4) + ")"
                   + ", kvend = " + kvend + "(" + (kvend * 4) + ")"
@@ -781,7 +782,7 @@
             spillLock.unlock();
             sortAndSpill(sameKeyCount, totalKeysCount);
           } catch (Throwable t) {
-            LOG.warn(outputContext.getDestinationVertexName() + ": " + "Got an exception in sortAndSpill", t);
+            LOG.warn(outputContext.getInputOutputVertexNames() + ": " + "Got an exception in sortAndSpill", t);
             sortSpillException = t;
           } finally {
             spillLock.lock();
@@ -794,7 +795,7 @@
           }
         }
       } catch (InterruptedException e) {
-        LOG.info(outputContext.getDestinationVertexName() + ": " + "Spill thread interrupted");
+        LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Spill thread interrupted");
         Thread.currentThread().interrupt();
       } finally {
         spillLock.unlock();
@@ -830,7 +831,7 @@
     bufend = bufmark;
     spillInProgress = true;
     if (LOG.isInfoEnabled()) {
-      LOG.info(outputContext.getDestinationVertexName() + ": Spilling map output."
+      LOG.info(outputContext.getInputOutputVertexNames() + ": Spilling map output."
           + "bufstart=" + bufstart + ", bufend = " + bufmark + ", bufvoid = " + bufvoid
           +"; kvstart=" + kvstart + "(" + (kvstart * 4) + ")"
           +", kvend = " + kvend + "(" + (kvend * 4) + ")"
@@ -936,7 +937,7 @@
               TezRawKeyValueIterator kvIter =
                 new MRResultIterator(spstart, spindex);
               if (LOG.isDebugEnabled()) {
-                LOG.debug(outputContext.getDestinationVertexName() + ": " + "Running combine processor");
+                LOG.debug(outputContext.getInputOutputVertexNames() + ": " + "Running combine processor");
               }
               runCombineProcessor(kvIter, writer);
             }
@@ -975,7 +976,7 @@
         totalIndexCacheMemory +=
           spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
       }
-      LOG.info(outputContext.getDestinationVertexName() + ": " + "Finished spill " + numSpills
+      LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Finished spill " + numSpills
       + " at " + filename.toString());
       ++numSpills;
       if (!isFinalMergeEnabled()) {
@@ -1172,7 +1173,7 @@
         outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent,
         partitionStats, reportDetailedPartitionStats(), auxiliaryService, deflater);
 
-    LOG.info(outputContext.getDestinationVertexName() + ": " +
+    LOG.info(outputContext.getInputOutputVertexNames() + ": " +
         "Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index);
 
     if (sendEvent) {
@@ -1339,7 +1340,7 @@
             segmentList.add(s);
           }
           if (LOG.isDebugEnabled()) {
-            LOG.debug(outputContext.getDestinationVertexName() + ": "
+            LOG.debug(outputContext.getInputOutputVertexNames() + ": "
                 + "TaskIdentifier=" + taskIdentifier + " Partition=" + parts +
                 "Spill =" + i + "(" + indexRecord.getStartOffset() + "," +
                 indexRecord.getRawLength() + ", " +
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 5ff2944..faf7586 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -104,7 +104,7 @@
   // Maybe setup a separate statistics class which can be shared between the
   // buffer and the main path instead of having multiple arrays.
 
-  private final String destNameTrimmed;
+  private final String sourceDestNameTrimmed;
   private final long availableMemory;
   @VisibleForTesting
   final WrappedBuffer[] buffers;
@@ -206,7 +206,8 @@
 
     Preconditions.checkArgument(availableMemoryBytes >= 0, "availableMemory should be >= 0 bytes");
 
-    this.destNameTrimmed = TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName());
+    this.sourceDestNameTrimmed = TezUtilsInternal.cleanVertexName(outputContext.getTaskVertexName()) + " -> "
+        + TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName());
     //Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might not add much value in
     // this case.  Add it later if needed.
     boolean pipelinedShuffleConf = this.conf.getBoolean(TezRuntimeConfiguration
@@ -257,7 +258,7 @@
     buffers[0] = new WrappedBuffer(numOutputs, sizePerBuffer);
     numInitializedBuffers = 1;
     if (LOG.isDebugEnabled()) {
-      LOG.debug(destNameTrimmed + ": " + "Initializing Buffer #" +
+      LOG.debug(sourceDestNameTrimmed + ": " + "Initializing Buffer #" +
           numInitializedBuffers + " with size=" + sizePerBuffer);
     }
     currentBuffer = buffers[0];
@@ -313,7 +314,7 @@
       skipBuffers = false;
       writer = null;
     }
-    LOG.info(destNameTrimmed + ": "
+    LOG.info(sourceDestNameTrimmed + ": "
         + "numBuffers=" + numBuffers
         + ", sizePerBuffer=" + sizePerBuffer
         + ", skipBuffers=" + skipBuffers
@@ -493,7 +494,7 @@
       // Update overall stats
       final int filledBufferCount = filledBuffers.size();
       if (LOG.isDebugEnabled() || (filledBufferCount % 10) == 0) {
-        LOG.info(destNameTrimmed + ": " + "Moving to next buffer. Total filled buffers: " + filledBufferCount);
+        LOG.info(sourceDestNameTrimmed + ": " + "Moving to next buffer. Total filled buffers: " + filledBufferCount);
       }
       updateGlobalStats(currentBuffer);
 
@@ -531,7 +532,7 @@
 
       final int filledBufferCount = filledBuffers.size();
       if (LOG.isDebugEnabled() || (filledBufferCount % 10) == 0) {
-        LOG.info(destNameTrimmed + ": triggering spill. filledBuffers.size=" + filledBufferCount);
+        LOG.info(sourceDestNameTrimmed + ": triggering spill. filledBuffers.size=" + filledBufferCount);
       }
       pendingSpillCount.incrementAndGet();
       int spillNumber = numSpills.getAndIncrement();
@@ -673,10 +674,10 @@
       spillResult = new SpillResult(compressedLength, this.filledBuffers);
 
       handleSpillIndex(spillPathDetails, spillRecord);
-      LOG.info(destNameTrimmed + ": " + "Finished spill " + spillIndex);
+      LOG.info(sourceDestNameTrimmed + ": " + "Finished spill " + spillIndex);
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug(destNameTrimmed + ": " + "Spill=" + spillIndex + ", indexPath="
+        LOG.debug(sourceDestNameTrimmed + ": " + "Spill=" + spillIndex + ", indexPath="
             + spillPathDetails.indexFilePath + ", outputPath=" + spillPathDetails.outputFilePath);
       }
       return spillResult;
@@ -754,7 +755,8 @@
     isShutdown.set(true);
     spillLock.lock();
     try {
-      LOG.info(destNameTrimmed + ": " + "Waiting for all spills to complete : Pending : " + pendingSpillCount.get());
+      LOG.info(
+          sourceDestNameTrimmed + ": " + "Waiting for all spills to complete : Pending : " + pendingSpillCount.get());
       while (pendingSpillCount.get() != 0 && spillException == null) {
         spillInProgress.await();
       }
@@ -762,7 +764,7 @@
       spillLock.unlock();
     }
     if (spillException != null) {
-      LOG.error(destNameTrimmed + ": " + "Error during spill, throwing");
+      LOG.error(sourceDestNameTrimmed + ": " + "Error during spill, throwing");
       // Assuming close will be called on the same thread as the write
       cleanup();
       currentBuffer.cleanup();
@@ -773,7 +775,7 @@
         throw new IOException(spillException);
       }
     } else {
-      LOG.info(destNameTrimmed + ": " + "All spills complete");
+      LOG.info(sourceDestNameTrimmed + ": " + "All spills complete");
       // Assuming close will be called on the same thread as the write
       cleanup();
 
@@ -1082,7 +1084,8 @@
       for (int i = 0; i < numPartitions; i++) {
         long segmentStart = out.getPos();
         if (numRecordsPerPartition[i] == 0) {
-          LOG.info(destNameTrimmed + ": " + "Skipping partition: " + i + " in final merge since it has no records");
+          LOG.info(
+              sourceDestNameTrimmed + ": " + "Skipping partition: " + i + " in final merge since it has no records");
           continue;
         }
         writer = new Writer(keySerialization, valSerialization, out, keyClass, valClass, codec, null, null);
@@ -1136,7 +1139,7 @@
     }
     finalSpillRecord.writeToFile(finalIndexPath, conf, localFs);
     fileOutputBytesCounter.increment(indexFileSizeEstimate);
-    LOG.info(destNameTrimmed + ": " + "Finished final spill after merging : " + numSpills.get() + " spills");
+    LOG.info(sourceDestNameTrimmed + ": " + "Finished final spill after merging : " + numSpills.get() + " spills");
   }
 
   private void deleteIntermediateSpills() {
@@ -1208,9 +1211,10 @@
       mayBeSendEventsForSpill(emptyPartitions, sizePerPartition,
           spillIndex, false);
 
-      LOG.info(destNameTrimmed + ": " + "Finished writing large record of size " + outSize + " to spill file " + spillIndex);
+      LOG.info(sourceDestNameTrimmed + ": " + "Finished writing large record of size " + outSize + " to spill file "
+          + spillIndex);
       if (LOG.isDebugEnabled()) {
-        LOG.debug(destNameTrimmed + ": " + "LargeRecord Spill=" + spillIndex + ", indexPath="
+        LOG.debug(sourceDestNameTrimmed + ": " + "LargeRecord Spill=" + spillIndex + ", indexPath="
             + spillPathDetails.indexFilePath + ", outputPath="
             + spillPathDetails.outputFilePath);
       }
@@ -1346,7 +1350,7 @@
     try {
       events = generateEventForSpill(emptyPartitions, sizePerPartition, spillNumber,
           isFinalUpdate);
-      LOG.info(destNameTrimmed + ": " + "Adding spill event for spill"
+      LOG.info(sourceDestNameTrimmed + ": " + "Adding spill event for spill"
           + " (final update=" + isFinalUpdate + "), spillId=" + spillNumber);
       if (pipelinedShuffle) {
         //Send out an event for consuming.
@@ -1355,7 +1359,7 @@
         this.finalEvents.addAll(events);
       }
     } catch (IOException e) {
-      LOG.error(destNameTrimmed + ": " + "Error in sending pipelined events", e);
+      LOG.error(sourceDestNameTrimmed + ": " + "Error in sending pipelined events", e);
       outputContext.reportFailure(TaskFailureType.NON_FATAL, e,
           "Error in sending events.");
     }
@@ -1414,7 +1418,7 @@
           availableBuffers.add(buffer);
         }
       } catch (Throwable e) {
-        LOG.error(destNameTrimmed + ": Failure while attempting to reset buffer after spill", e);
+        LOG.error(sourceDestNameTrimmed + ": Failure while attempting to reset buffer after spill", e);
         outputContext.reportFailure(TaskFailureType.NON_FATAL, e, "Failure while attempting to reset buffer after spill");
       }
 
@@ -1444,7 +1448,7 @@
     public void onFailure(Throwable t) {
       // spillException setup to throw an exception back to the user. Requires synchronization.
       // Consider removing it in favor of having Tez kill the task
-      LOG.error(destNameTrimmed + ": " + "Failure while spilling to disk", t);
+      LOG.error(sourceDestNameTrimmed + ": " + "Failure while spilling to disk", t);
       spillException = t;
       outputContext.reportFailure(TaskFailureType.NON_FATAL, t, "Failure while spilling to disk");
       spillLock.lock();
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 676fe17..44cb9d6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -135,7 +135,7 @@
 
       if (pipelinedShuffle) {
         if (finalMergeEnabled) {
-          LOG.info(getContext().getDestinationVertexName() + " disabling final merge as "
+          LOG.info(getContext().getInputOutputVertexNames() + " disabling final merge as "
               + TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " is enabled.");
           finalMergeEnabled = false;
           conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
@@ -194,7 +194,7 @@
       returnEvents.addAll(generateEvents());
       sorter = null;
     } else {
-      LOG.warn(getContext().getDestinationVertexName() +
+      LOG.warn(getContext().getInputOutputVertexNames() +
           ": Attempting to close output {} of type {} before it was started. Generating empty events",
           getContext().getDestinationVertexName(), this.getClass().getSimpleName());
       returnEvents = generateEmptyEvents();
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index e7a4429..bcacc52 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -106,7 +106,7 @@
       this.kvWriter = new UnorderedPartitionedKVWriter(getContext(), conf, 1,
           memoryUpdateCallbackHandler.getMemoryAssigned());
       isStarted.set(true);
-      LOG.info(getContext().getDestinationVertexName() + " started. MemoryAssigned="
+      LOG.info(getContext().getInputOutputVertexNames() + " started. MemoryAssigned="
           + memoryUpdateCallbackHandler.getMemoryAssigned());
     }
   }
@@ -130,7 +130,7 @@
       returnEvents = kvWriter.close();
       kvWriter = null;
     } else {
-      LOG.warn(getContext().getDestinationVertexName() +
+      LOG.warn(getContext().getInputOutputVertexNames() +
           ": Attempting to close output {} of type {} before it was started. Generating empty events",
           getContext().getDestinationVertexName(), this.getClass().getSimpleName());
       returnEvents = new LinkedList<Event>();
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 439b732..9bc7ea4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -108,7 +108,7 @@
       returnEvents = kvWriter.close();
       kvWriter = null;
     } else {
-      LOG.warn(getContext().getDestinationVertexName() +
+      LOG.warn(getContext().getInputOutputVertexNames() +
           ": Attempting to close output {} of type {} before it was started. Generating empty events",
           getContext().getDestinationVertexName(), this.getClass().getSimpleName());
       returnEvents = new LinkedList<Event>();
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
index b81c2bd..a7c7ca2 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
@@ -69,6 +69,7 @@
     }).when(ctx).requestInitialMemory(anyLong(), any(MemoryUpdateCallback.class));
     doReturn(conf).when(ctx).getContainerConfiguration();
     doReturn(TezUtils.createUserPayloadFromConf(userPayloadConf)).when(ctx).getUserPayload();
+    doReturn("taskVertex").when(ctx).getTaskVertexName();
     doReturn("destinationVertex").when(ctx).getDestinationVertexName();
     doReturn("UUID").when(ctx).getUniqueIdentifier();
     doReturn(new String[] { workingDir.toString() }).when(ctx).getWorkDirs();
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
index 2c9c3b2..7999d45 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -406,6 +406,7 @@
     doReturn(payLoad).when(context).getUserPayload();
     doReturn(5 * 1024 * 1024l).when(context).getTotalMemoryAvailableToTask();
     doReturn(UniqueID).when(context).getUniqueIdentifier();
+    doReturn("v0").when(context).getTaskVertexName();
     doReturn("v1").when(context).getDestinationVertexName();
     doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(context)
         .getServiceProviderMetaData