TEZ-1119. Support display of user payloads in Tez UI. (hitesh)
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
index db434ae..c15324c 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
@@ -19,7 +19,11 @@
 package org.apache.tez.common;
 
 import java.io.IOException;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -32,6 +36,8 @@
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 
+import com.google.protobuf.ByteString;
+
 public class TezCommonUtils {
   public static final FsPermission TEZ_AM_DIR_PERMISSION = FsPermission
       .createImmutable((short) 0700); // rwx--------
@@ -211,8 +217,6 @@
    * 
    * @param recoveryPath
    *          TEZ recovery directory used for Tez internals
-   * @param conf
-   *          Tez configuration
    * @param attemptID
    *          Application Attempt Id
    * @return App attempt specific recovery path
@@ -283,4 +287,23 @@
   public static FSDataOutputStream createFileForAM(FileSystem fs, Path filePath) throws IOException {
     return FileSystem.create(fs, filePath, new FsPermission(TEZ_AM_FILE_PERMISSION));
   }
+
+  @Private
+  public static ByteString compressByteArrayToByteString(byte[] inBytes) throws IOException {
+    ByteString.Output os = ByteString.newOutput();
+    DeflaterOutputStream compressOs = new DeflaterOutputStream(os, new Deflater(
+        Deflater.BEST_COMPRESSION));
+    compressOs.write(inBytes);
+    compressOs.finish();
+    ByteString byteString = os.toByteString();
+    return byteString;
+  }
+
+  @Private
+  public static byte[] decompressByteStringToByteArray(ByteString byteString) throws IOException {
+    InflaterInputStream in = new InflaterInputStream(byteString.newInput());
+    byte[] bytes = IOUtils.toByteArray(in);
+    return bytes;
+  }
+
 }
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 87573f3..87592e7 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -41,6 +41,7 @@
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.client.PreWarmContext;
 import org.apache.tez.client.TezAppMasterStatus;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUserPayload;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
@@ -277,9 +278,30 @@
       builder
           .setUserPayload(ByteString.copyFrom(descriptor.getUserPayload()));
     }
+    if (descriptor.getHistoryText() != null) {
+      try {
+        builder.setHistoryText(TezCommonUtils.compressByteArrayToByteString(
+            descriptor.getHistoryText().getBytes()));
+      } catch (IOException e) {
+        throw new TezUncheckedException(e);
+      }
+    }
     return builder.build();
   }
 
+  public static String getHistoryTextFromProto(TezEntityDescriptorProto proto) {
+    if (!proto.hasHistoryText()) {
+      return null;
+    }
+    try {
+      return new String(TezCommonUtils.decompressByteStringToByteArray(proto.getHistoryText()));
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+  }
+
+
+
   public static RootInputLeafOutputProto convertToDAGPlan(
       RootInputLeafOutput<? extends TezEntityDescriptor> descriptor) {
     RootInputLeafOutputProto.Builder builder = RootInputLeafOutputProto.newBuilder();
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java
index cb5e84f..194492f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java
@@ -29,4 +29,16 @@
     this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     return this;
   }
+
+  /**
+   * Provide a human-readable version of the user payload that can be
+   * used in the History UI
+   * @param historyText History text
+   */
+  @Override
+  public EdgeManagerDescriptor setHistoryText(String historyText) {
+    super.setHistoryText(historyText);
+    return this;
+  }
+
 }
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
index cc6948c..0d01262 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
@@ -33,7 +33,19 @@
 
   @Override
   public InputDescriptor setUserPayload(byte[] userPayload) {
-    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
+    super.setUserPayload(userPayload);
     return this;
   }
+
+  /**
+   * Provide a human-readable version of the user payload that can be
+   * used in the History UI
+   * @param historyText History text
+   */
+  @Override
+  public InputDescriptor setHistoryText(String historyText) {
+    super.setHistoryText(historyText);
+    return this;
+  }
+
 }
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
index a34d35c..e9cbe51 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
@@ -33,7 +33,19 @@
 
   @Override
   public OutputDescriptor setUserPayload(byte[] userPayload) {
-    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
+    super.setUserPayload(userPayload);
     return this;
   }
+
+  /**
+   * Provide a human-readable version of the user payload that can be
+   * used in the History UI
+   * @param historyText History text
+   */
+  @Override
+  public OutputDescriptor setHistoryText(String historyText) {
+    super.setHistoryText(historyText);
+    return this;
+  }
+
 }
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
index a0e574d..4641d93 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
@@ -31,8 +31,21 @@
     super(processorClassName);
   }
 
+  @Override
   public ProcessorDescriptor setUserPayload(byte[] userPayload) {
-    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
+    super.setUserPayload(userPayload);
     return this;
   }
+
+  /**
+   * Provide a human-readable version of the user payload that can be
+   * used in the History UI
+   * @param historyText History text
+   */
+  @Override
+  public ProcessorDescriptor setHistoryText(String historyText) {
+    super.setHistoryText(historyText);
+    return this;
+  }
+
 }
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
index 25788ff..1047b09 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
@@ -31,6 +31,7 @@
 
   protected TezUserPayload userPayload;
   private String className;
+  protected String historyText;
 
   @Private // for Writable
   public TezEntityDescriptor() {
@@ -44,11 +45,31 @@
     return (userPayload == null) ? null : userPayload.getPayload();
   }
 
+  /**
+   * Set user payload for this entity descriptor
+   * @param userPayload User Payload
+   * @return
+   */
   public TezEntityDescriptor setUserPayload(byte[] userPayload) {
     this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     return this;
   }
 
+  /**
+   * Provide a human-readable version of the user payload that can be
+   * used in the History UI
+   * @param historyText History text
+   */
+  public TezEntityDescriptor setHistoryText(String historyText) {
+    this.historyText = historyText;
+    return this;
+  }
+
+  @Private // Internal use only
+  public String getHistoryText() {
+    return this.historyText;
+  }
+
   public String getClassName() {
     return this.className;
   }
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginDescriptor.java
index 58980b5..3e72523 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginDescriptor.java
@@ -36,4 +36,16 @@
     this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     return this;
   }
+
+  /**
+   * Provide a human-readable version of the user payload that can be
+   * used in the History UI
+   * @param historyText History text
+   */
+  @Override
+  public VertexManagerPluginDescriptor setHistoryText(String historyText) {
+    super.setHistoryText(historyText);
+    return this;
+  }
+
 }
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index c7a317e..9fa9cf2 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -113,6 +113,7 @@
 message TezEntityDescriptorProto {
   optional string class_name = 1;
   optional bytes user_payload = 2;
+  optional bytes history_text = 3;
 }
 
 message RootInputLeafOutputProto {
diff --git a/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java b/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
index 121c673..f52f69b 100644
--- a/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
+++ b/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.tez.common;
 
 import java.io.File;
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
new file mode 100644
index 0000000..ed2d8bd
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import java.io.IOException;
+
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestDagTypeConverters {
+
+  @Test
+  public void testTezEntityDescriptorSerialization() throws IOException {
+    byte[] payload = new String("Foobar").getBytes();
+    String historytext = "Bar123";
+    TezEntityDescriptor entityDescriptor =
+        new InputDescriptor("inputClazz").setUserPayload(payload)
+        .setHistoryText(historytext);
+    TezEntityDescriptorProto proto =
+        DagTypeConverters.convertToDAGPlan(entityDescriptor);
+    Assert.assertArrayEquals(payload, proto.getUserPayload().toByteArray());
+    Assert.assertTrue(proto.hasHistoryText());
+    Assert.assertNotEquals(historytext, proto.getHistoryText());
+    Assert.assertEquals(historytext, new String(
+        TezCommonUtils.decompressByteStringToByteArray(proto.getHistoryText())));
+
+    // Ensure that the history text is not deserialized
+    InputDescriptor inputDescriptor =
+        DagTypeConverters.convertInputDescriptorFromDAGPlan(proto);
+    Assert.assertNull(inputDescriptor.getHistoryText());
+  }
+
+}
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
index 43e9fa7..7deed48 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
@@ -1,5 +1,22 @@
-package org.apache.tez.dag.api.client.rpc;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
+package org.apache.tez.dag.api.client.rpc;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.*;
@@ -43,7 +60,6 @@
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
-
 public class TestDAGClient {
 
   private DAGClientRPCImpl dagClient;
diff --git a/tez-common/src/main/java/org/apache/tez/common/RuntimeUtils.java b/tez-common/src/main/java/org/apache/tez/common/ReflectionUtils.java
similarity index 99%
rename from tez-common/src/main/java/org/apache/tez/common/RuntimeUtils.java
rename to tez-common/src/main/java/org/apache/tez/common/ReflectionUtils.java
index d32f3cd..3b9ac81 100644
--- a/tez-common/src/main/java/org/apache/tez/common/RuntimeUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/ReflectionUtils.java
@@ -30,7 +30,7 @@
 import org.apache.tez.dag.api.TezUncheckedException;
 
 @Private
-public class RuntimeUtils {
+public class ReflectionUtils {
   
   private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<String, Class<?>>();
 
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
index cfa3413..f73f223 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
@@ -229,24 +229,6 @@
     return output;
   }
 
-  @Private
-  public static ByteString compressByteArrayToByteString(byte[] inBytes) throws IOException {
-    ByteString.Output os = ByteString.newOutput();
-    DeflaterOutputStream compressOs = new DeflaterOutputStream(os, new Deflater(
-        Deflater.BEST_COMPRESSION));
-    compressOs.write(inBytes);
-    compressOs.finish();
-    ByteString byteString = os.toByteString();
-    return byteString;
-  }
-  
-  @Private
-  public static byte[] decompressByteStringToByteArray(ByteString byteString) throws IOException {
-    InflaterInputStream in = new InflaterInputStream(byteString.newInput());
-    byte[] bytes = IOUtils.toByteArray(in);
-    return bytes;
-  }
-
   private static final Pattern pattern = Pattern.compile("\\W");
   @Private
   public static final int MAX_VERTEX_NAME_LENGTH = 40;
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
index cbd48fa..6dca990 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.tez.dag.api.client;
 
 import java.util.Collections;
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index c4a1085..3d20881 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -33,7 +33,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
@@ -101,7 +101,7 @@
     String className = input.getInitializerClassName();
     @SuppressWarnings("unchecked")
     Class<? extends TezRootInputInitializer> clazz =
-        (Class<? extends TezRootInputInitializer>) RuntimeUtils
+        (Class<? extends TezRootInputInitializer>) ReflectionUtils
             .getClazz(className);
     TezRootInputInitializer initializer = null;
     try {
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index a6004cf..e2a9a27 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -25,7 +25,7 @@
 
 import javax.annotation.Nullable;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUserPayload;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManager;
@@ -125,7 +125,7 @@
       case CUSTOM:
         if (edgeProperty.getEdgeManagerDescriptor() != null) {
           String edgeManagerClassName = edgeProperty.getEdgeManagerDescriptor().getClassName();
-          edgeManager = RuntimeUtils.createClazzInstance(edgeManagerClassName);
+          edgeManager = ReflectionUtils.createClazzInstance(edgeManagerClassName);
         }
         break;
       default:
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index a68093a..3162d6c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -54,7 +54,7 @@
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
@@ -1657,7 +1657,7 @@
         dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
           @Override
           public Void run() throws Exception {
-            OutputCommitter outputCommitter = RuntimeUtils.createClazzInstance(
+            OutputCommitter outputCommitter = ReflectionUtils.createClazzInstance(
                 od.getInitializerClassName());
             OutputCommitterContext outputCommitterContext =
                 new OutputCommitterContextImpl(appContext.getApplicationID(),
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 35c3943..3eb9ca1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -32,7 +32,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUserPayload;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DagTypeConverters;
@@ -235,7 +235,7 @@
   public void initialize() {
     pluginContext = new VertexManagerPluginContextImpl();
     if (pluginDesc != null) {
-      plugin = RuntimeUtils.createClazzInstance(pluginDesc.getClassName());
+      plugin = ReflectionUtils.createClazzInstance(pluginDesc.getClassName());
       payload = DagTypeConverters.convertToTezUserPayload(pluginDesc.getUserPayload());
     }
     if (payload == null || payload.getPayload() == null) {
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 82e063a..e8e45a3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -24,7 +24,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.history.logging.HistoryLoggingService;
@@ -57,7 +57,7 @@
         TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT);
 
     historyLoggingService =
-        RuntimeUtils.createClazzInstance(historyServiceClassName);
+        ReflectionUtils.createClazzInstance(historyServiceClassName);
     historyLoggingService.setAppContext(context);
     addService(historyLoggingService);
 
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 3997c2f..b58ee9b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.history.utils;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedHashMap;
@@ -26,6 +27,8 @@
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
 import org.apache.tez.dag.app.dag.impl.VertexStats;
@@ -35,125 +38,58 @@
 
 public class DAGUtils {
 
+  static final String DAG_NAME_KEY = "dagName";
+  static final String VERTICES_KEY = "vertices";
+  static final String EDGES_KEY = "edges";
+  static final String VERTEX_GROUPS_KEY = "vertexGroups";
+
+  static final String VERTEX_NAME_KEY = "vertexName";
+  static final String PROCESSOR_CLASS_KEY = "processorClass";
+  static final String IN_EDGE_IDS_KEY = "inEdgeIds";
+  static final String OUT_EDGE_IDS_KEY = "outEdgeIds";
+  static final String ADDITIONAL_INPUTS_KEY = "additionalInputs";
+  static final String ADDITIONAL_OUTPUTS_KEY = "additionalOutputs";
+  static final String VERTEX_MANAGER_PLUGIN_CLASS_KEY =
+      "vertexManagerPluginClass";
+  static final String USER_PAYLOAD_AS_TEXT = "userPayloadAsText";
+  static final String OUTPUT_USER_PAYLOAD_AS_TEXT = "outputUserPayloadAsText";
+  static final String INPUT_USER_PAYLOAD_AS_TEXT = "inputUserPayloadAsText";
+
+  static final String EDGE_ID_KEY = "edgeId";
+  static final String INPUT_VERTEX_NAME_KEY = "inputVertexName";
+  static final String OUTPUT_VERTEX_NAME_KEY = "outputVertexName";
+  static final String DATA_MOVEMENT_TYPE_KEY = "dataMovementType";
+  static final String DATA_SOURCE_TYPE_KEY = "dataSourceType";
+  static final String SCHEDULING_TYPE_KEY = "schedulingType";
+  static final String EDGE_SOURCE_CLASS_KEY = "edgeSourceClass";
+  static final String EDGE_DESTINATION_CLASS_KEY =
+      "edgeDestinationClass";
+
+  static final String NAME_KEY = "name";
+  static final String CLASS_KEY = "class";
+  static final String INITIALIZER_KEY = "initializer";
+
+  static final String VERTEX_GROUP_NAME_KEY = "groupName";
+  static final String VERTEX_GROUP_MEMBERS_KEY = "groupMembers";
+  static final String VERTEX_GROUP_OUTPUTS_KEY = "outputs";
+  static final String VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY = "edgeMergedInputs";
+  static final String VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY = "destinationVertexName";
+
+
+
   public static JSONObject generateSimpleJSONPlan(DAGProtos.DAGPlan dagPlan) throws JSONException {
-
-    final String DAG_NAME_KEY = "dagName";
-    final String VERTICES_KEY = "vertices";
-    final String EDGES_KEY = "edges";
-
-    final String VERTEX_NAME_KEY = "vertexName";
-    final String PROCESSOR_CLASS_KEY = "processorClass";
-    final String IN_EDGE_IDS_KEY = "inEdgeIds";
-    final String OUT_EDGE_IDS_KEY = "outEdgeIds";
-    final String ADDITIONAL_INPUTS_KEY = "additionalInputs";
-    final String ADDITIONAL_OUTPUTS_KEY = "additionalOutputs";
-    final String VERTEX_MANAGER_PLUGIN_CLASS_KEY =
-        "vertexManagerPluginClass";
-
-    final String EDGE_ID_KEY = "edgeId";
-    final String INPUT_VERTEX_NAME_KEY = "inputVertexName";
-    final String OUTPUT_VERTEX_NAME_KEY = "outputVertexName";
-    final String DATA_MOVEMENT_TYPE_KEY = "dataMovementType";
-    final String DATA_SOURCE_TYPE_KEY = "dataSourceType";
-    final String SCHEDULING_TYPE_KEY = "schedulingType";
-    final String EDGE_SOURCE_CLASS_KEY = "edgeSourceClass";
-    final String EDGE_DESTINATION_CLASS_KEY =
-        "edgeDestinationClass";
-
-    final String NAME_KEY = "name";
-    final String CLASS_KEY = "class";
-    final String INITIALIZER_KEY = "initializer";
-
-    JSONObject dagJson = new JSONObject();
-    dagJson.put(DAG_NAME_KEY, dagPlan.getName());
-    for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) {
-      JSONObject vertexJson = new JSONObject();
-      vertexJson.put(VERTEX_NAME_KEY, vertexPlan.getName());
-
-      if (vertexPlan.hasProcessorDescriptor()) {
-        vertexJson.put(PROCESSOR_CLASS_KEY,
-            vertexPlan.getProcessorDescriptor().getClassName());
-      }
-
-      for (String inEdgeId : vertexPlan.getInEdgeIdList()) {
-        vertexJson.accumulate(IN_EDGE_IDS_KEY, inEdgeId);
-      }
-      for (String outEdgeId : vertexPlan.getOutEdgeIdList()) {
-        vertexJson.accumulate(OUT_EDGE_IDS_KEY, outEdgeId);
-      }
-
-      for (DAGProtos.RootInputLeafOutputProto input :
-          vertexPlan.getInputsList()) {
-        JSONObject jsonInput = new JSONObject();
-        jsonInput.put(NAME_KEY, input.getName());
-        jsonInput.put(CLASS_KEY, input.getEntityDescriptor().getClassName());
-        if (input.hasInitializerClassName()) {
-          jsonInput.put(INITIALIZER_KEY, input.getInitializerClassName());
-        }
-        vertexJson.accumulate(ADDITIONAL_INPUTS_KEY, jsonInput);
-      }
-
-      for (DAGProtos.RootInputLeafOutputProto output :
-          vertexPlan.getOutputsList()) {
-        JSONObject jsonOutput = new JSONObject();
-        jsonOutput.put(NAME_KEY, output.getName());
-        jsonOutput.put(CLASS_KEY, output.getEntityDescriptor().getClassName());
-        if (output.hasInitializerClassName()) {
-          jsonOutput.put(INITIALIZER_KEY, output.getInitializerClassName());
-        }
-        vertexJson.accumulate(ADDITIONAL_OUTPUTS_KEY, jsonOutput);
-      }
-
-      if (vertexPlan.hasVertexManagerPlugin()) {
-        vertexJson.put(VERTEX_MANAGER_PLUGIN_CLASS_KEY,
-            vertexPlan.getVertexManagerPlugin().getClassName());
-      }
-
-      dagJson.accumulate(VERTICES_KEY, vertexJson);
+    JSONObject dagJson;
+    try {
+      dagJson = new JSONObject(convertDAGPlanToATSMap(dagPlan));
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
     }
-
-    for (DAGProtos.EdgePlan edgePlan : dagPlan.getEdgeList()) {
-      JSONObject edgeJson = new JSONObject();
-      edgeJson.put(EDGE_ID_KEY, edgePlan.getId());
-      edgeJson.put(INPUT_VERTEX_NAME_KEY, edgePlan.getInputVertexName());
-      edgeJson.put(OUTPUT_VERTEX_NAME_KEY, edgePlan.getOutputVertexName());
-      edgeJson.put(DATA_MOVEMENT_TYPE_KEY,
-          edgePlan.getDataMovementType().name());
-      edgeJson.put(DATA_SOURCE_TYPE_KEY, edgePlan.getDataSourceType().name());
-      edgeJson.put(SCHEDULING_TYPE_KEY, edgePlan.getSchedulingType().name());
-      edgeJson.put(EDGE_SOURCE_CLASS_KEY,
-          edgePlan.getEdgeSource().getClassName());
-      edgeJson.put(EDGE_DESTINATION_CLASS_KEY,
-          edgePlan.getEdgeDestination().getClassName());
-
-      dagJson.accumulate(EDGES_KEY, edgeJson);
-    }
-
     return dagJson;
   }
 
   public static JSONObject convertCountersToJSON(TezCounters counters)
       throws JSONException {
-    JSONObject jsonObject = new JSONObject();
-    if (counters == null) {
-      return jsonObject;
-    }
-
-    for (CounterGroup group : counters) {
-      JSONObject jsonCGrp = new JSONObject();
-      jsonCGrp.put(ATSConstants.COUNTER_GROUP_NAME, group.getName());
-      jsonCGrp.put(ATSConstants.COUNTER_GROUP_DISPLAY_NAME,
-          group.getDisplayName());
-      for (TezCounter counter : group) {
-        JSONObject counterJson = new JSONObject();
-        counterJson.put(ATSConstants.COUNTER_NAME, counter.getName());
-        counterJson.put(ATSConstants.COUNTER_DISPLAY_NAME,
-            counter.getDisplayName());
-        counterJson.put(ATSConstants.COUNTER_VALUE, counter.getValue());
-        jsonCGrp.accumulate(ATSConstants.COUNTERS, counterJson);
-      }
-      jsonObject.accumulate(ATSConstants.COUNTER_GROUPS, jsonCGrp);
-    }
+    JSONObject jsonObject = new JSONObject(convertCountersToATSMap(counters));
     return jsonObject;
   }
 
@@ -185,44 +121,10 @@
   }
 
   public static Map<String,Object> convertDAGPlanToATSMap(
-      DAGProtos.DAGPlan dagPlan) {
+      DAGProtos.DAGPlan dagPlan) throws IOException {
 
     final String VERSION_KEY = "version";
     final int version = 1;
-    final String DAG_NAME_KEY = "dagName";
-    final String VERTICES_KEY = "vertices";
-    final String EDGES_KEY = "edges";
-    final String VERTEX_GROUPS_KEY = "vertexGroups";
-
-    final String VERTEX_NAME_KEY = "vertexName";
-    final String PROCESSOR_CLASS_KEY = "processorClass";
-    final String IN_EDGE_IDS_KEY = "inEdgeIds";
-    final String OUT_EDGE_IDS_KEY = "outEdgeIds";
-    final String ADDITIONAL_INPUTS_KEY = "additionalInputs";
-    final String ADDITIONAL_OUTPUTS_KEY = "additionalOutputs";
-    final String VERTEX_MANAGER_PLUGIN_CLASS_KEY =
-        "vertexManagerPluginClass";
-
-    final String EDGE_ID_KEY = "edgeId";
-    final String INPUT_VERTEX_NAME_KEY = "inputVertexName";
-    final String OUTPUT_VERTEX_NAME_KEY = "outputVertexName";
-    final String DATA_MOVEMENT_TYPE_KEY = "dataMovementType";
-    final String DATA_SOURCE_TYPE_KEY = "dataSourceType";
-    final String SCHEDULING_TYPE_KEY = "schedulingType";
-    final String EDGE_SOURCE_CLASS_KEY = "edgeSourceClass";
-    final String EDGE_DESTINATION_CLASS_KEY =
-        "edgeDestinationClass";
-
-    final String NAME_KEY = "name";
-    final String CLASS_KEY = "class";
-    final String INITIALIZER_KEY = "initializer";
-
-    final String VERTEX_GROUP_NAME_KEY = "groupName";
-    final String VERTEX_GROUP_MEMBERS_KEY = "groupMembers";
-    final String VERTEX_GROUP_OUTPUTS_KEY = "outputs";
-    final String VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY = "edgeMergedInputs";
-    final String VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY = "destinationVertexName";
-
     Map<String,Object> dagMap = new LinkedHashMap<String, Object>();
     dagMap.put(DAG_NAME_KEY, dagPlan.getName());
     dagMap.put(VERSION_KEY, version);
@@ -234,6 +136,11 @@
       if (vertexPlan.hasProcessorDescriptor()) {
         vertexMap.put(PROCESSOR_CLASS_KEY,
             vertexPlan.getProcessorDescriptor().getClassName());
+        if (vertexPlan.getProcessorDescriptor().hasHistoryText()) {
+          vertexMap.put(USER_PAYLOAD_AS_TEXT,
+              DagTypeConverters.getHistoryTextFromProto(
+                  vertexPlan.getProcessorDescriptor()));
+        }
       }
 
       ArrayList<Object> inEdgeIdList = new ArrayList<Object>();
@@ -253,6 +160,11 @@
         if (input.hasInitializerClassName()) {
           inputMap.put(INITIALIZER_KEY, input.getInitializerClassName());
         }
+        if (input.getEntityDescriptor().hasHistoryText()) {
+          inputMap.put(USER_PAYLOAD_AS_TEXT,
+              DagTypeConverters.getHistoryTextFromProto(
+                  input.getEntityDescriptor()));
+        }
         inputsList.add(inputMap);
       }
       putInto(vertexMap, ADDITIONAL_INPUTS_KEY, inputsList);
@@ -266,6 +178,11 @@
         if (output.hasInitializerClassName()) {
           outputMap.put(INITIALIZER_KEY, output.getInitializerClassName());
         }
+        if (output.getEntityDescriptor().hasHistoryText()) {
+          outputMap.put(USER_PAYLOAD_AS_TEXT,
+              DagTypeConverters.getHistoryTextFromProto(
+                  output.getEntityDescriptor()));
+        }
         outputsList.add(outputMap);
       }
       putInto(vertexMap, ADDITIONAL_OUTPUTS_KEY, outputsList);
@@ -293,7 +210,16 @@
           edgePlan.getEdgeSource().getClassName());
       edgeMap.put(EDGE_DESTINATION_CLASS_KEY,
           edgePlan.getEdgeDestination().getClassName());
-
+      if (edgePlan.getEdgeSource().hasHistoryText()) {
+        edgeMap.put(OUTPUT_USER_PAYLOAD_AS_TEXT,
+            DagTypeConverters.getHistoryTextFromProto(
+                edgePlan.getEdgeSource()));
+      }
+      if (edgePlan.getEdgeDestination().hasHistoryText()) {
+        edgeMap.put(INPUT_USER_PAYLOAD_AS_TEXT,
+            DagTypeConverters.getHistoryTextFromProto(
+                edgePlan.getEdgeDestination()));
+      }
       edgesList.add(edgeMap);
     }
     putInto(dagMap, EDGES_KEY, edgesList);
@@ -321,6 +247,11 @@
             && edgeMergedInputInfo.getMergedInput().hasClassName()) {
             edgeMergedInput.put(PROCESSOR_CLASS_KEY,
                 edgeMergedInputInfo.getMergedInput().getClassName());
+            if (edgeMergedInputInfo.getMergedInput().hasHistoryText()) {
+              edgeMergedInput.put(USER_PAYLOAD_AS_TEXT,
+                  DagTypeConverters.getHistoryTextFromProto(
+                      edgeMergedInputInfo.getMergedInput()));
+            }
           }
           edgeMergedInputs.add(edgeMergedInput);
         }
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/JavaProfilerOptions.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/JavaProfilerOptions.java
index 6539944..37b49d8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/JavaProfilerOptions.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/JavaProfilerOptions.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.tez.dag.utils;
 
 import java.util.BitSet;
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
index aae82f8..bcd65e4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
@@ -32,7 +32,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezException;
 
 import com.google.common.collect.Lists;
@@ -55,7 +55,7 @@
   }
 
   public static void addUrlsToClassPath(List<URL> urls) {
-    RuntimeUtils.addResourcesToSystemClassLoader(urls);
+    ReflectionUtils.addResourcesToSystemClassLoader(urls);
   }
 
   private static Path downloadResource(String destName, URI uri, Configuration conf)
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
index 8b888ff..1146ce4 100644
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.tez.runtime.task;
 
 public interface ErrorReporter {
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
index 009df3c..4fb563c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.tez.dag.api.client;
 
 import static org.junit.Assert.*;
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index ed91e20..5c79f22 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -25,7 +25,7 @@
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.VertexLocationHint;
@@ -76,7 +76,7 @@
     event.toProtoStream(os);
     os.flush();
     os.close();
-    deserializedEvent = RuntimeUtils.createClazzInstance(
+    deserializedEvent = ReflectionUtils.createClazzInstance(
         event.getClass().getName());
     LOG.info("Serialized event to byte array"
         + ", eventType=" + event.getEventType()
@@ -100,7 +100,7 @@
     SummaryEventProto summaryEventProto =
         SummaryEventProto.parseDelimitedFrom(
             new ByteArrayInputStream(os.toByteArray()));
-    deserializedEvent = RuntimeUtils.createClazzInstance(
+    deserializedEvent = ReflectionUtils.createClazzInstance(
         event.getClass().getName());
     ((SummaryEvent)deserializedEvent).fromSummaryProtoStream(summaryEventProto);
     return deserializedEvent;
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
new file mode 100644
index 0000000..f926471
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.utils;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.GroupInputEdge;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.runtime.api.OutputCommitter;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class TestDAGUtils {
+
+  private DAGPlan createDAG() {
+    // Create a plan with 3 vertices: A, B, C. Group(A,B)->C
+    Configuration conf = new Configuration(false);
+    int dummyTaskCount = 1;
+    Resource dummyTaskResource = Resource.newInstance(1, 1);
+    org.apache.tez.dag.api.Vertex v1 = new org.apache.tez.dag.api.Vertex("vertex1",
+        new ProcessorDescriptor("Processor").setHistoryText("vertex1 Processor HistoryText"),
+        dummyTaskCount, dummyTaskResource);
+    v1.addInput("input1", new InputDescriptor("input.class").setHistoryText("input HistoryText"),
+        null);
+    org.apache.tez.dag.api.Vertex v2 = new org.apache.tez.dag.api.Vertex("vertex2",
+        new ProcessorDescriptor("Processor").setHistoryText("vertex2 Processor HistoryText"),
+        dummyTaskCount, dummyTaskResource);
+    org.apache.tez.dag.api.Vertex v3 = new org.apache.tez.dag.api.Vertex("vertex3",
+        new ProcessorDescriptor("Processor").setHistoryText("vertex3 Processor HistoryText"),
+        dummyTaskCount, dummyTaskResource);
+
+    DAG dag = new DAG("testDag");
+    String groupName1 = "uv12";
+    org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
+    OutputDescriptor outDesc = new OutputDescriptor("output.class")
+        .setHistoryText("uvOut HistoryText");
+    uv12.addOutput("uvOut", outDesc, OutputCommitter.class);
+    v3.addOutput("uvOut", outDesc, OutputCommitter.class);
+
+    GroupInputEdge e1 = new GroupInputEdge(uv12, v3,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER,
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+            new OutputDescriptor("dummy output class").setHistoryText("Dummy History Text"),
+            new InputDescriptor("dummy input class").setHistoryText("Dummy History Text")),
+        new InputDescriptor("merge.class").setHistoryText("Merge HistoryText"));
+
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addVertex(v3);
+    dag.addEdge(e1);
+    return dag.createDag(conf);
+  }
+
+  @Test
+  public void testConvertDAGPlanToATSMap() throws IOException, JSONException {
+    DAGPlan dagPlan = createDAG();
+    Map<String, Object> atsMap = DAGUtils.convertDAGPlanToATSMap(dagPlan);
+    Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_NAME_KEY));
+    Assert.assertEquals(dagPlan.getName(), atsMap.get(DAGUtils.DAG_NAME_KEY));
+    Assert.assertTrue(atsMap.containsKey("version"));
+    Assert.assertEquals(1, atsMap.get("version"));
+    Assert.assertTrue(atsMap.containsKey(DAGUtils.VERTICES_KEY));
+    Assert.assertTrue(atsMap.containsKey(DAGUtils.EDGES_KEY));
+    Assert.assertTrue(atsMap.containsKey(DAGUtils.VERTEX_GROUPS_KEY));
+
+    Assert.assertEquals(3, ((Collection<?>) atsMap.get(DAGUtils.VERTICES_KEY)).size());
+    Set<String> vNames = Sets.newHashSet("vertex1", "vertex2", "vertex3");
+
+    Set<String> inEdgeIds = new HashSet<String>();
+    Set<String> outEdgeIds = new HashSet<String>();
+
+    int additionalInputCount = 0;
+    int additionalOutputCount = 0;
+
+    for (Object o : ((Collection<?>) atsMap.get(DAGUtils.VERTICES_KEY))) {
+      Map<String, Object> v = (Map<String, Object>) o;
+      Assert.assertTrue(v.containsKey(DAGUtils.VERTEX_NAME_KEY));
+      Assert.assertTrue(v.containsKey(DAGUtils.PROCESSOR_CLASS_KEY));
+      Assert.assertTrue(v.containsKey(DAGUtils.USER_PAYLOAD_AS_TEXT));
+
+      if (v.containsKey(DAGUtils.IN_EDGE_IDS_KEY)) {
+        inEdgeIds.addAll(((Collection<String>) v.get(DAGUtils.IN_EDGE_IDS_KEY)));
+      }
+      if (v.containsKey(DAGUtils.OUT_EDGE_IDS_KEY)) {
+        outEdgeIds.addAll(((Collection<String>) v.get(DAGUtils.OUT_EDGE_IDS_KEY)));
+      }
+
+      String vName = (String) v.get(DAGUtils.VERTEX_NAME_KEY);
+      Assert.assertTrue(vNames.contains(vName));
+      String procPayload = vName + " Processor HistoryText";
+      Assert.assertEquals(procPayload, v.get(DAGUtils.USER_PAYLOAD_AS_TEXT));
+
+      if (v.containsKey(DAGUtils.ADDITIONAL_INPUTS_KEY)) {
+        additionalInputCount += ((Collection<?>) v.get(DAGUtils.ADDITIONAL_INPUTS_KEY)).size();
+        for (Object input : ((Collection<?>) v.get(DAGUtils.ADDITIONAL_INPUTS_KEY))) {
+          Map<String, Object> inputMap = (Map<String, Object>) input;
+          Assert.assertTrue(inputMap.containsKey(DAGUtils.NAME_KEY));
+          Assert.assertTrue(inputMap.containsKey(DAGUtils.CLASS_KEY));
+          Assert.assertFalse(inputMap.containsKey(DAGUtils.INITIALIZER_KEY));
+          Assert.assertEquals("input HistoryText", inputMap.get(DAGUtils.USER_PAYLOAD_AS_TEXT));
+        }
+      }
+
+      if (v.containsKey(DAGUtils.ADDITIONAL_OUTPUTS_KEY)) {
+        additionalOutputCount += ((Collection<?>) v.get(DAGUtils.ADDITIONAL_OUTPUTS_KEY)).size();
+        for (Object output : ((Collection<?>) v.get(DAGUtils.ADDITIONAL_OUTPUTS_KEY))) {
+          Map<String, Object> outputMap = (Map<String, Object>) output;
+          Assert.assertTrue(outputMap.containsKey(DAGUtils.NAME_KEY));
+          Assert.assertTrue(outputMap.containsKey(DAGUtils.CLASS_KEY));
+          Assert.assertTrue(outputMap.containsKey(DAGUtils.INITIALIZER_KEY));
+          Assert.assertEquals("uvOut HistoryText", outputMap.get(DAGUtils.USER_PAYLOAD_AS_TEXT));
+        }
+      }
+    }
+
+    // 1 input
+    Assert.assertEquals(1, additionalInputCount);
+    // 3 outputs due to vertex group
+    Assert.assertEquals(3, additionalOutputCount);
+
+    // 1 edge translates to 2 due to vertex group
+    Assert.assertEquals(2, inEdgeIds.size());
+    Assert.assertEquals(2, outEdgeIds.size());
+
+    for (Object o : ((Collection<?>) atsMap.get(DAGUtils.EDGES_KEY))) {
+      Map<String, Object> e = (Map<String, Object>) o;
+
+      Assert.assertTrue(inEdgeIds.contains(e.get(DAGUtils.EDGE_ID_KEY)));
+      Assert.assertTrue(outEdgeIds.contains(e.get(DAGUtils.EDGE_ID_KEY)));
+      Assert.assertTrue(e.containsKey(DAGUtils.INPUT_VERTEX_NAME_KEY));
+      Assert.assertTrue(e.containsKey(DAGUtils.OUTPUT_VERTEX_NAME_KEY));
+      Assert.assertEquals(DataMovementType.SCATTER_GATHER.name(),
+          e.get(DAGUtils.DATA_MOVEMENT_TYPE_KEY));
+      Assert.assertEquals(DataSourceType.PERSISTED.name(), e.get(DAGUtils.DATA_SOURCE_TYPE_KEY));
+      Assert.assertEquals(SchedulingType.SEQUENTIAL.name(), e.get(DAGUtils.SCHEDULING_TYPE_KEY));
+      Assert.assertEquals("dummy output class", e.get(DAGUtils.EDGE_SOURCE_CLASS_KEY));
+      Assert.assertEquals("dummy input class", e.get(DAGUtils.EDGE_DESTINATION_CLASS_KEY));
+      Assert.assertEquals("Dummy History Text", e.get(DAGUtils.OUTPUT_USER_PAYLOAD_AS_TEXT));
+      Assert.assertEquals("Dummy History Text", e.get(DAGUtils.INPUT_USER_PAYLOAD_AS_TEXT));
+    }
+
+    for (Object o : ((Collection<?>) atsMap.get(DAGUtils.VERTEX_GROUPS_KEY))) {
+      Map<String, Object> e = (Map<String, Object>) o;
+      Assert.assertEquals("uv12", e.get(DAGUtils.VERTEX_GROUP_NAME_KEY));
+      Assert.assertTrue(e.containsKey(DAGUtils.VERTEX_GROUP_MEMBERS_KEY));
+      Assert.assertTrue(e.containsKey(DAGUtils.VERTEX_GROUP_OUTPUTS_KEY));
+      Assert.assertTrue(e.containsKey(DAGUtils.VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY));
+    }
+  }
+
+}
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 1c6d45e..051bfee 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.mapreduce.examples;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
@@ -198,12 +199,16 @@
 
     List<Vertex> vertices = new ArrayList<Vertex>();
 
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream(4096);
+    mapStageConf.writeXml(outputStream);
+    String mapStageHistoryText = new String(outputStream.toByteArray(), "UTF-8");
     byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
     byte[] mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload,
       TextInputFormat.class.getName());
     int numMaps = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
     Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
-        MapProcessor.class.getName()).setUserPayload(mapPayload),
+        MapProcessor.class.getName()).setUserPayload(mapPayload)
+            .setHistoryText(mapStageHistoryText),
         numMaps, MRHelpers.getMapResource(mapStageConf));
     if (generateSplitsInClient) {
       mapVertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
@@ -222,19 +227,27 @@
     MRHelpers.addMRInput(mapVertex, mapInputPayload, initializerClazz);
     vertices.add(mapVertex);
 
+    ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096);
+    iReduceStageConf.writeXml(iROutputStream);
+    String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8");
     Vertex ivertex = new Vertex("intermediate_reducer", new ProcessorDescriptor(
-        ReduceProcessor.class.getName()).
-        setUserPayload(MRHelpers.createUserPayloadFromConf(iReduceStageConf)),
-        2,
-        MRHelpers.getReduceResource(iReduceStageConf));
+        ReduceProcessor.class.getName())
+            .setUserPayload(MRHelpers.createUserPayloadFromConf(iReduceStageConf))
+            .setHistoryText(iReduceStageHistoryText),
+        2, MRHelpers.getReduceResource(iReduceStageConf));
     ivertex.setTaskLocalFiles(commonLocalResources);
     vertices.add(ivertex);
 
+    ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096);
+    finalReduceConf.writeXml(finalReduceOutputStream);
+    String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8");
     byte[] finalReducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
     Vertex finalReduceVertex = new Vertex("finalreduce",
         new ProcessorDescriptor(
-            ReduceProcessor.class.getName()).setUserPayload(finalReducePayload),
-                1, MRHelpers.getReduceResource(finalReduceConf));
+            ReduceProcessor.class.getName())
+                .setUserPayload(finalReducePayload)
+                .setHistoryText(finalReduceStageHistoryText), 1,
+        MRHelpers.getReduceResource(finalReduceConf));
     finalReduceVertex.setTaskLocalFiles(commonLocalResources);
     MRHelpers.addMROutputLegacy(finalReduceVertex, finalReducePayload);
     vertices.add(finalReduceVertex);
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
index 6e94c22..76da547 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
@@ -29,8 +29,7 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
 
 import com.google.common.base.Preconditions;
@@ -85,7 +84,7 @@
       Class<? extends InputFormat> clazz = (Class<? extends InputFormat>) 
           getClassFromName(split.wrappedInputFormatName);
       try {
-        wrappedInputFormat = ReflectionUtils.newInstance(clazz, conf);
+        wrappedInputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf);
       } catch (Exception e) {
         throw new TezUncheckedException(e);
       }
@@ -93,7 +92,7 @@
   }
 
   static Class<?> getClassFromName(String name) {
-    return RuntimeUtils.getClazz(name);
+    return ReflectionUtils.getClazz(name);
   }
 
   public class TezGroupedSplitsRecordReader implements RecordReader<K, V> {
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
index 5fa3e79..3fd0b6e 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
@@ -31,8 +31,7 @@
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
 
 import com.google.common.base.Preconditions;
@@ -120,7 +119,7 @@
       Class<? extends InputFormat> clazz = (Class<? extends InputFormat>) 
           getClassFromName(split.wrappedInputFormatName);
       try {
-        wrappedInputFormat = ReflectionUtils.newInstance(clazz, conf);
+        wrappedInputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf);
       } catch (Exception e) {
         throw new TezUncheckedException(e);
       }
@@ -128,7 +127,7 @@
   }
   
   static Class<?> getClassFromName(String name) {
-    return RuntimeUtils.getClazz(name);
+    return ReflectionUtils.getClazz(name);
   }
   
   public class TezGroupedSplitsRecordReader  extends RecordReader<K, V> {
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 70f56d4..f078f1d 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -18,8 +18,11 @@
 
 package org.apache.tez.dag.history.logging.ats;
 
+import java.io.IOException;
+
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.AMLaunchedEvent;
@@ -277,8 +280,12 @@
     atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
     atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDAGName());
 
-    atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
-        DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
+    try {
+      atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
+          DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
     atsEntity.addOtherInfo(ATSConstants.APPLICATION_ID,
         event.getApplicationAttemptId().getApplicationId().toString());
 
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 881ae90..122cc23 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -42,7 +42,7 @@
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -503,7 +503,7 @@
   }
 
   private LogicalInput createInputFromDescriptor(InputDescriptor inputDesc) {
-    Input input = RuntimeUtils.createClazzInstance(inputDesc.getClassName());
+    Input input = ReflectionUtils.createClazzInstance(inputDesc.getClassName());
     if (!(input instanceof LogicalInput)) {
       throw new TezUncheckedException(inputDesc.getClass().getName()
           + " is not a sub-type of LogicalInput."
@@ -514,7 +514,7 @@
   
   private LogicalOutput createOutput(OutputSpec outputSpec) {
     LOG.info("Creating Output");
-    Output output = RuntimeUtils.createClazzInstance(outputSpec
+    Output output = ReflectionUtils.createClazzInstance(outputSpec
         .getOutputDescriptor().getClassName());
     if (!(output instanceof LogicalOutput)) {
       throw new TezUncheckedException(output.getClass().getName()
@@ -526,7 +526,7 @@
 
   private LogicalIOProcessor createProcessor(
       ProcessorDescriptor processorDescriptor) {
-    Processor processor = RuntimeUtils.createClazzInstance(processorDescriptor
+    Processor processor = ReflectionUtils.createClazzInstance(processorDescriptor
         .getClassName());
     if (!(processor instanceof LogicalIOProcessor)) {
       throw new TezUncheckedException(processor.getClass().getName()
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
index c024d0a..6d2f852 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
@@ -30,7 +30,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.dag.api.TezEntityDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -65,9 +65,9 @@
   private final List<RequestorInfo> requestList;
 
   /**
-   * @param numInputs
+   * @param numTotalInputs
    *          total number of Inputs for the task
-   * @param numOutputs
+   * @param numTotalOutputs
    *          total number of Outputs for the task
    * @param conf
    *          Tez specific task configuration
@@ -122,7 +122,7 @@
       String allocatorClassName = conf.get(TezJobConfig.TEZ_RUNTIME_SCALE_TASK_MEMORY_ALLOCATOR_CLASS,
           TezJobConfig.TEZ_RUNTIME_SCALE_TASK_MEMORY_ALLOCATOR_CLASS_DEFAULT);
       LOG.info("Using Allocator class: " + allocatorClassName);
-      InitialMemoryAllocator allocator = RuntimeUtils.createClazzInstance(allocatorClassName);
+      InitialMemoryAllocator allocator = ReflectionUtils.createClazzInstance(allocatorClassName);
       allocator.setConf(conf);
       allocations = allocator.assignMemory(totalJvmMemory, numTotalInputs, numTotalOutputs,
           Iterables.unmodifiableIterable(requestContexts));
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestRuntimeUtils.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestReflectionUtils.java
similarity index 92%
rename from tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestRuntimeUtils.java
rename to tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestReflectionUtils.java
index f374cc0..d10bd29 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestRuntimeUtils.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestReflectionUtils.java
@@ -29,11 +29,11 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezException;
 import org.junit.Test;
 
-public class TestRuntimeUtils {
+public class TestReflectionUtils {
 
   @Test
   public void testAddResourceToClasspath() throws IOException, TezException {
@@ -57,7 +57,7 @@
       urlForm = urlForm.substring(0, urlForm.lastIndexOf('/') + 1);
       URL url = new URL(urlForm);
 
-      RuntimeUtils.addResourcesToClasspath(Collections.singletonList(url));
+      ReflectionUtils.addResourcesToClasspath(Collections.singletonList(url));
 
       loadedUrl = Thread.currentThread().getContextClassLoader().getResource(rsrcName);
 
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index e37bbe3..ac88865 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -25,6 +25,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
@@ -90,7 +91,7 @@
     }
     if (shufflePayload.hasEmptyPartitions()) {
       try {
-        byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions());
+        byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions());
         BitSet emptyPartitionsBitSet = TezUtils.fromByteArray(emptyPartitions);
         if (emptyPartitionsBitSet.get(partitionId)) {
           LOG.info("Source partition: " + partitionId + " did not generate any data. SrcAttempt: ["
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 2d51a7d..4e3e94b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -48,6 +48,7 @@
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
@@ -477,7 +478,7 @@
     }
     if (emptyPartitions.cardinality() != 0) {
       // Empty partitions exist
-      ByteString emptyPartitionsByteString = TezUtils.compressByteArrayToByteString(TezUtils
+      ByteString emptyPartitionsByteString = TezCommonUtils.compressByteArrayToByteString(TezUtils
           .toByteArray(emptyPartitions));
       payloadBuidler.setEmptyPartitions(emptyPartitionsByteString);
     }
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 8fbac92..c1f5ddb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -35,6 +35,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
@@ -165,7 +166,7 @@
       }
       if (emptyPartitions > 0) {
         ByteString emptyPartitionsBytesString =
-            TezUtils.compressByteArrayToByteString(TezUtils.toByteArray(emptyPartitionDetails));
+            TezCommonUtils.compressByteArrayToByteString(TezUtils.toByteArray(emptyPartitionDetails));
         payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
         LOG.info("EmptyPartition bitsetSize=" + emptyPartitionDetails.cardinality() + ", numOutputs="
                 + getNumPhysicalOutputs() + ", emptyPartitions=" + emptyPartitions
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index 646ddbd..8cd47c1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -31,6 +31,7 @@
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
@@ -134,7 +135,7 @@
       BitSet emptyPartitions = new BitSet();
       emptyPartitions.set(0);
       ByteString emptyPartitionsBytesString =
-          TezUtils.compressByteArrayToByteString(TezUtils.toByteArray(emptyPartitions));
+          TezCommonUtils.compressByteArrayToByteString(TezUtils.toByteArray(emptyPartitions));
       payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
     }
     if (outputGenerated) {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
index 628298a..65dba32 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
@@ -26,6 +26,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
@@ -101,7 +102,7 @@
         + stringify(shufflePayload));
 
     if (shufflePayload.hasEmptyPartitions()) {
-      byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(shufflePayload
+      byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload
           .getEmptyPartitions());
       BitSet emptyPartionsBitSet = TezUtils.fromByteArray(emptyPartitions);
       if (emptyPartionsBitSet.get(srcIndex)) {
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index f45a7d4..5c3a83a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -26,7 +26,7 @@
 import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.EdgeManager;
 import org.apache.tez.dag.api.EdgeManagerContext;
@@ -125,7 +125,7 @@
           newEdgeManagers.clear();
           for (Entry<String, EdgeManagerDescriptor> entry :
               ((Map<String, EdgeManagerDescriptor>)invocation.getArguments()[2]).entrySet()) {
-            EdgeManager edgeManager = RuntimeUtils.createClazzInstance(
+            EdgeManager edgeManager = ReflectionUtils.createClazzInstance(
                 entry.getValue().getClassName());
             final byte[] userPayload = entry.getValue().getUserPayload();
             edgeManager.initialize(new EdgeManagerContext() {
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index f5fd276..9b37c78 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -57,6 +57,7 @@
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
@@ -316,7 +317,7 @@
     BitSet emptyPartitionBits = null;
     if (partitionsWithData.cardinality() != numPartitions) {
       assertTrue(eventProto.hasEmptyPartitions());
-      byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(eventProto
+      byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(eventProto
           .getEmptyPartitions());
       emptyPartitionBits = TezUtils.fromByteArray(emptyPartitions);
       assertEquals(numPartitions - partitionsWithData.cardinality(),
@@ -505,7 +506,7 @@
       emptyPartitionBits = new BitSet(numPartitions);
     } else {
       assertTrue(eventProto.hasEmptyPartitions());
-      byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(eventProto
+      byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(eventProto
           .getEmptyPartitions());
       emptyPartitionBits = TezUtils.fromByteArray(emptyPartitions);
       if (numRecordsWritten == 0) {
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java
index e95c30d..2aa7396 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java
@@ -27,6 +27,7 @@
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezInputContext;
@@ -151,7 +152,7 @@
     for (int i : emptyPartitions) {
       bitSet.set(i);
     }
-    ByteString emptyPartitionsBytesString = TezUtils.compressByteArrayToByteString(TezUtils
+    ByteString emptyPartitionsBytesString = TezCommonUtils.compressByteArrayToByteString(TezUtils
         .toByteArray(bitSet));
     return emptyPartitionsBytesString;
   }
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index e5667c7..ea0378d 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -67,7 +67,7 @@
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.client.TezAppMasterStatus;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -671,7 +671,7 @@
           .getConfigurationBytes());
 
       try {
-        RuntimeUtils.getClazz(RELOCALIZATION_TEST_CLASS_NAME);
+        ReflectionUtils.getClazz(RELOCALIZATION_TEST_CLASS_NAME);
         LOG.info("Class found");
         FileSystem fs = FileSystem.get(conf);
         fs.mkdirs(new Path("/tmp/relocalizationfilefound"));