TEZ-1119. Support display of user payloads in Tez UI. (hitesh)
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
index db434ae..c15324c 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
@@ -19,7 +19,11 @@
package org.apache.tez.common;
import java.io.IOException;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -32,6 +36,8 @@
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
+import com.google.protobuf.ByteString;
+
public class TezCommonUtils {
public static final FsPermission TEZ_AM_DIR_PERMISSION = FsPermission
.createImmutable((short) 0700); // rwx--------
@@ -211,8 +217,6 @@
*
* @param recoveryPath
* TEZ recovery directory used for Tez internals
- * @param conf
- * Tez configuration
* @param attemptID
* Application Attempt Id
* @return App attempt specific recovery path
@@ -283,4 +287,23 @@
public static FSDataOutputStream createFileForAM(FileSystem fs, Path filePath) throws IOException {
return FileSystem.create(fs, filePath, new FsPermission(TEZ_AM_FILE_PERMISSION));
}
+
+ @Private
+ public static ByteString compressByteArrayToByteString(byte[] inBytes) throws IOException {
+ ByteString.Output os = ByteString.newOutput();
+ DeflaterOutputStream compressOs = new DeflaterOutputStream(os, new Deflater(
+ Deflater.BEST_COMPRESSION));
+ compressOs.write(inBytes);
+ compressOs.finish();
+ ByteString byteString = os.toByteString();
+ return byteString;
+ }
+
+ @Private
+ public static byte[] decompressByteStringToByteArray(ByteString byteString) throws IOException {
+ InflaterInputStream in = new InflaterInputStream(byteString.newInput());
+ byte[] bytes = IOUtils.toByteArray(in);
+ return bytes;
+ }
+
}
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 87573f3..87592e7 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.client.PreWarmContext;
import org.apache.tez.client.TezAppMasterStatus;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUserPayload;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounter;
@@ -277,9 +278,30 @@
builder
.setUserPayload(ByteString.copyFrom(descriptor.getUserPayload()));
}
+ if (descriptor.getHistoryText() != null) {
+ try {
+ builder.setHistoryText(TezCommonUtils.compressByteArrayToByteString(
+ descriptor.getHistoryText().getBytes()));
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
return builder.build();
}
+ public static String getHistoryTextFromProto(TezEntityDescriptorProto proto) {
+ if (!proto.hasHistoryText()) {
+ return null;
+ }
+ try {
+ return new String(TezCommonUtils.decompressByteStringToByteArray(proto.getHistoryText()));
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+
+
+
public static RootInputLeafOutputProto convertToDAGPlan(
RootInputLeafOutput<? extends TezEntityDescriptor> descriptor) {
RootInputLeafOutputProto.Builder builder = RootInputLeafOutputProto.newBuilder();
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java
index cb5e84f..194492f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java
@@ -29,4 +29,16 @@
this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
return this;
}
+
+ /**
+ * Provide a human-readable version of the user payload that can be
+ * used in the History UI
+ * @param historyText History text
+ */
+ @Override
+ public EdgeManagerDescriptor setHistoryText(String historyText) {
+ super.setHistoryText(historyText);
+ return this;
+ }
+
}
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
index cc6948c..0d01262 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
@@ -33,7 +33,19 @@
@Override
public InputDescriptor setUserPayload(byte[] userPayload) {
- this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
+ super.setUserPayload(userPayload);
return this;
}
+
+ /**
+ * Provide a human-readable version of the user payload that can be
+ * used in the History UI
+ * @param historyText History text
+ */
+ @Override
+ public InputDescriptor setHistoryText(String historyText) {
+ super.setHistoryText(historyText);
+ return this;
+ }
+
}
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
index a34d35c..e9cbe51 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
@@ -33,7 +33,19 @@
@Override
public OutputDescriptor setUserPayload(byte[] userPayload) {
- this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
+ super.setUserPayload(userPayload);
return this;
}
+
+ /**
+ * Provide a human-readable version of the user payload that can be
+ * used in the History UI
+ * @param historyText History text
+ */
+ @Override
+ public OutputDescriptor setHistoryText(String historyText) {
+ super.setHistoryText(historyText);
+ return this;
+ }
+
}
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
index a0e574d..4641d93 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
@@ -31,8 +31,21 @@
super(processorClassName);
}
+ @Override
public ProcessorDescriptor setUserPayload(byte[] userPayload) {
- this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
+ super.setUserPayload(userPayload);
return this;
}
+
+ /**
+ * Provide a human-readable version of the user payload that can be
+ * used in the History UI
+ * @param historyText History text
+ */
+ @Override
+ public ProcessorDescriptor setHistoryText(String historyText) {
+ super.setHistoryText(historyText);
+ return this;
+ }
+
}
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
index 25788ff..1047b09 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
@@ -31,6 +31,7 @@
protected TezUserPayload userPayload;
private String className;
+ protected String historyText;
@Private // for Writable
public TezEntityDescriptor() {
@@ -44,11 +45,31 @@
return (userPayload == null) ? null : userPayload.getPayload();
}
+ /**
+ * Set user payload for this entity descriptor
+ * @param userPayload User Payload
+ * @return
+ */
public TezEntityDescriptor setUserPayload(byte[] userPayload) {
this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
return this;
}
+ /**
+ * Provide a human-readable version of the user payload that can be
+ * used in the History UI
+ * @param historyText History text
+ */
+ public TezEntityDescriptor setHistoryText(String historyText) {
+ this.historyText = historyText;
+ return this;
+ }
+
+ @Private // Internal use only
+ public String getHistoryText() {
+ return this.historyText;
+ }
+
public String getClassName() {
return this.className;
}
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginDescriptor.java
index 58980b5..3e72523 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginDescriptor.java
@@ -36,4 +36,16 @@
this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
return this;
}
+
+ /**
+ * Provide a human-readable version of the user payload that can be
+ * used in the History UI
+ * @param historyText History text
+ */
+ @Override
+ public VertexManagerPluginDescriptor setHistoryText(String historyText) {
+ super.setHistoryText(historyText);
+ return this;
+ }
+
}
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index c7a317e..9fa9cf2 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -113,6 +113,7 @@
message TezEntityDescriptorProto {
optional string class_name = 1;
optional bytes user_payload = 2;
+ optional bytes history_text = 3;
}
message RootInputLeafOutputProto {
diff --git a/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java b/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
index 121c673..f52f69b 100644
--- a/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
+++ b/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.tez.common;
import java.io.File;
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
new file mode 100644
index 0000000..ed2d8bd
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import java.io.IOException;
+
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestDagTypeConverters {
+
+ @Test
+ public void testTezEntityDescriptorSerialization() throws IOException {
+ byte[] payload = new String("Foobar").getBytes();
+ String historytext = "Bar123";
+ TezEntityDescriptor entityDescriptor =
+ new InputDescriptor("inputClazz").setUserPayload(payload)
+ .setHistoryText(historytext);
+ TezEntityDescriptorProto proto =
+ DagTypeConverters.convertToDAGPlan(entityDescriptor);
+ Assert.assertArrayEquals(payload, proto.getUserPayload().toByteArray());
+ Assert.assertTrue(proto.hasHistoryText());
+ Assert.assertNotEquals(historytext, proto.getHistoryText());
+ Assert.assertEquals(historytext, new String(
+ TezCommonUtils.decompressByteStringToByteArray(proto.getHistoryText())));
+
+ // Ensure that the history text is not deserialized
+ InputDescriptor inputDescriptor =
+ DagTypeConverters.convertInputDescriptorFromDAGPlan(proto);
+ Assert.assertNull(inputDescriptor.getHistoryText());
+ }
+
+}
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
index 43e9fa7..7deed48 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java
@@ -1,5 +1,22 @@
-package org.apache.tez.dag.api.client.rpc;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api.client.rpc;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.*;
@@ -43,7 +60,6 @@
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
-
public class TestDAGClient {
private DAGClientRPCImpl dagClient;
diff --git a/tez-common/src/main/java/org/apache/tez/common/RuntimeUtils.java b/tez-common/src/main/java/org/apache/tez/common/ReflectionUtils.java
similarity index 99%
rename from tez-common/src/main/java/org/apache/tez/common/RuntimeUtils.java
rename to tez-common/src/main/java/org/apache/tez/common/ReflectionUtils.java
index d32f3cd..3b9ac81 100644
--- a/tez-common/src/main/java/org/apache/tez/common/RuntimeUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/ReflectionUtils.java
@@ -30,7 +30,7 @@
import org.apache.tez.dag.api.TezUncheckedException;
@Private
-public class RuntimeUtils {
+public class ReflectionUtils {
private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<String, Class<?>>();
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
index cfa3413..f73f223 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
@@ -229,24 +229,6 @@
return output;
}
- @Private
- public static ByteString compressByteArrayToByteString(byte[] inBytes) throws IOException {
- ByteString.Output os = ByteString.newOutput();
- DeflaterOutputStream compressOs = new DeflaterOutputStream(os, new Deflater(
- Deflater.BEST_COMPRESSION));
- compressOs.write(inBytes);
- compressOs.finish();
- ByteString byteString = os.toByteString();
- return byteString;
- }
-
- @Private
- public static byte[] decompressByteStringToByteArray(ByteString byteString) throws IOException {
- InflaterInputStream in = new InflaterInputStream(byteString.newInput());
- byte[] bytes = IOUtils.toByteArray(in);
- return bytes;
- }
-
private static final Pattern pattern = Pattern.compile("\\W");
@Private
public static final int MAX_VERTEX_NAME_LENGTH = 40;
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
index cbd48fa..6dca990 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.tez.dag.api.client;
import java.util.Collections;
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index c4a1085..3d20881 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -33,7 +33,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
@@ -101,7 +101,7 @@
String className = input.getInitializerClassName();
@SuppressWarnings("unchecked")
Class<? extends TezRootInputInitializer> clazz =
- (Class<? extends TezRootInputInitializer>) RuntimeUtils
+ (Class<? extends TezRootInputInitializer>) ReflectionUtils
.getClazz(className);
TezRootInputInitializer initializer = null;
try {
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index a6004cf..e2a9a27 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -25,7 +25,7 @@
import javax.annotation.Nullable;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUserPayload;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeManager;
@@ -125,7 +125,7 @@
case CUSTOM:
if (edgeProperty.getEdgeManagerDescriptor() != null) {
String edgeManagerClassName = edgeProperty.getEdgeManagerDescriptor().getClassName();
- edgeManager = RuntimeUtils.createClazzInstance(edgeManagerClassName);
+ edgeManager = ReflectionUtils.createClazzInstance(edgeManagerClassName);
}
break;
default:
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index a68093a..3162d6c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -54,7 +54,7 @@
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
@@ -1657,7 +1657,7 @@
dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
- OutputCommitter outputCommitter = RuntimeUtils.createClazzInstance(
+ OutputCommitter outputCommitter = ReflectionUtils.createClazzInstance(
od.getInitializerClassName());
OutputCommitterContext outputCommitterContext =
new OutputCommitterContextImpl(appContext.getApplicationID(),
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 35c3943..3eb9ca1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -32,7 +32,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUserPayload;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DagTypeConverters;
@@ -235,7 +235,7 @@
public void initialize() {
pluginContext = new VertexManagerPluginContextImpl();
if (pluginDesc != null) {
- plugin = RuntimeUtils.createClazzInstance(pluginDesc.getClassName());
+ plugin = ReflectionUtils.createClazzInstance(pluginDesc.getClassName());
payload = DagTypeConverters.convertToTezUserPayload(pluginDesc.getUserPayload());
}
if (payload == null || payload.getPayload() == null) {
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 82e063a..e8e45a3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -24,7 +24,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.history.logging.HistoryLoggingService;
@@ -57,7 +57,7 @@
TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT);
historyLoggingService =
- RuntimeUtils.createClazzInstance(historyServiceClassName);
+ ReflectionUtils.createClazzInstance(historyServiceClassName);
historyLoggingService.setAppContext(context);
addService(historyLoggingService);
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 3997c2f..b58ee9b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -18,6 +18,7 @@
package org.apache.tez.dag.history.utils;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
@@ -26,6 +27,8 @@
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
import org.apache.tez.dag.app.dag.impl.VertexStats;
@@ -35,125 +38,58 @@
public class DAGUtils {
+ static final String DAG_NAME_KEY = "dagName";
+ static final String VERTICES_KEY = "vertices";
+ static final String EDGES_KEY = "edges";
+ static final String VERTEX_GROUPS_KEY = "vertexGroups";
+
+ static final String VERTEX_NAME_KEY = "vertexName";
+ static final String PROCESSOR_CLASS_KEY = "processorClass";
+ static final String IN_EDGE_IDS_KEY = "inEdgeIds";
+ static final String OUT_EDGE_IDS_KEY = "outEdgeIds";
+ static final String ADDITIONAL_INPUTS_KEY = "additionalInputs";
+ static final String ADDITIONAL_OUTPUTS_KEY = "additionalOutputs";
+ static final String VERTEX_MANAGER_PLUGIN_CLASS_KEY =
+ "vertexManagerPluginClass";
+ static final String USER_PAYLOAD_AS_TEXT = "userPayloadAsText";
+ static final String OUTPUT_USER_PAYLOAD_AS_TEXT = "outputUserPayloadAsText";
+ static final String INPUT_USER_PAYLOAD_AS_TEXT = "inputUserPayloadAsText";
+
+ static final String EDGE_ID_KEY = "edgeId";
+ static final String INPUT_VERTEX_NAME_KEY = "inputVertexName";
+ static final String OUTPUT_VERTEX_NAME_KEY = "outputVertexName";
+ static final String DATA_MOVEMENT_TYPE_KEY = "dataMovementType";
+ static final String DATA_SOURCE_TYPE_KEY = "dataSourceType";
+ static final String SCHEDULING_TYPE_KEY = "schedulingType";
+ static final String EDGE_SOURCE_CLASS_KEY = "edgeSourceClass";
+ static final String EDGE_DESTINATION_CLASS_KEY =
+ "edgeDestinationClass";
+
+ static final String NAME_KEY = "name";
+ static final String CLASS_KEY = "class";
+ static final String INITIALIZER_KEY = "initializer";
+
+ static final String VERTEX_GROUP_NAME_KEY = "groupName";
+ static final String VERTEX_GROUP_MEMBERS_KEY = "groupMembers";
+ static final String VERTEX_GROUP_OUTPUTS_KEY = "outputs";
+ static final String VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY = "edgeMergedInputs";
+ static final String VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY = "destinationVertexName";
+
+
+
public static JSONObject generateSimpleJSONPlan(DAGProtos.DAGPlan dagPlan) throws JSONException {
-
- final String DAG_NAME_KEY = "dagName";
- final String VERTICES_KEY = "vertices";
- final String EDGES_KEY = "edges";
-
- final String VERTEX_NAME_KEY = "vertexName";
- final String PROCESSOR_CLASS_KEY = "processorClass";
- final String IN_EDGE_IDS_KEY = "inEdgeIds";
- final String OUT_EDGE_IDS_KEY = "outEdgeIds";
- final String ADDITIONAL_INPUTS_KEY = "additionalInputs";
- final String ADDITIONAL_OUTPUTS_KEY = "additionalOutputs";
- final String VERTEX_MANAGER_PLUGIN_CLASS_KEY =
- "vertexManagerPluginClass";
-
- final String EDGE_ID_KEY = "edgeId";
- final String INPUT_VERTEX_NAME_KEY = "inputVertexName";
- final String OUTPUT_VERTEX_NAME_KEY = "outputVertexName";
- final String DATA_MOVEMENT_TYPE_KEY = "dataMovementType";
- final String DATA_SOURCE_TYPE_KEY = "dataSourceType";
- final String SCHEDULING_TYPE_KEY = "schedulingType";
- final String EDGE_SOURCE_CLASS_KEY = "edgeSourceClass";
- final String EDGE_DESTINATION_CLASS_KEY =
- "edgeDestinationClass";
-
- final String NAME_KEY = "name";
- final String CLASS_KEY = "class";
- final String INITIALIZER_KEY = "initializer";
-
- JSONObject dagJson = new JSONObject();
- dagJson.put(DAG_NAME_KEY, dagPlan.getName());
- for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) {
- JSONObject vertexJson = new JSONObject();
- vertexJson.put(VERTEX_NAME_KEY, vertexPlan.getName());
-
- if (vertexPlan.hasProcessorDescriptor()) {
- vertexJson.put(PROCESSOR_CLASS_KEY,
- vertexPlan.getProcessorDescriptor().getClassName());
- }
-
- for (String inEdgeId : vertexPlan.getInEdgeIdList()) {
- vertexJson.accumulate(IN_EDGE_IDS_KEY, inEdgeId);
- }
- for (String outEdgeId : vertexPlan.getOutEdgeIdList()) {
- vertexJson.accumulate(OUT_EDGE_IDS_KEY, outEdgeId);
- }
-
- for (DAGProtos.RootInputLeafOutputProto input :
- vertexPlan.getInputsList()) {
- JSONObject jsonInput = new JSONObject();
- jsonInput.put(NAME_KEY, input.getName());
- jsonInput.put(CLASS_KEY, input.getEntityDescriptor().getClassName());
- if (input.hasInitializerClassName()) {
- jsonInput.put(INITIALIZER_KEY, input.getInitializerClassName());
- }
- vertexJson.accumulate(ADDITIONAL_INPUTS_KEY, jsonInput);
- }
-
- for (DAGProtos.RootInputLeafOutputProto output :
- vertexPlan.getOutputsList()) {
- JSONObject jsonOutput = new JSONObject();
- jsonOutput.put(NAME_KEY, output.getName());
- jsonOutput.put(CLASS_KEY, output.getEntityDescriptor().getClassName());
- if (output.hasInitializerClassName()) {
- jsonOutput.put(INITIALIZER_KEY, output.getInitializerClassName());
- }
- vertexJson.accumulate(ADDITIONAL_OUTPUTS_KEY, jsonOutput);
- }
-
- if (vertexPlan.hasVertexManagerPlugin()) {
- vertexJson.put(VERTEX_MANAGER_PLUGIN_CLASS_KEY,
- vertexPlan.getVertexManagerPlugin().getClassName());
- }
-
- dagJson.accumulate(VERTICES_KEY, vertexJson);
+ JSONObject dagJson;
+ try {
+ dagJson = new JSONObject(convertDAGPlanToATSMap(dagPlan));
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
}
-
- for (DAGProtos.EdgePlan edgePlan : dagPlan.getEdgeList()) {
- JSONObject edgeJson = new JSONObject();
- edgeJson.put(EDGE_ID_KEY, edgePlan.getId());
- edgeJson.put(INPUT_VERTEX_NAME_KEY, edgePlan.getInputVertexName());
- edgeJson.put(OUTPUT_VERTEX_NAME_KEY, edgePlan.getOutputVertexName());
- edgeJson.put(DATA_MOVEMENT_TYPE_KEY,
- edgePlan.getDataMovementType().name());
- edgeJson.put(DATA_SOURCE_TYPE_KEY, edgePlan.getDataSourceType().name());
- edgeJson.put(SCHEDULING_TYPE_KEY, edgePlan.getSchedulingType().name());
- edgeJson.put(EDGE_SOURCE_CLASS_KEY,
- edgePlan.getEdgeSource().getClassName());
- edgeJson.put(EDGE_DESTINATION_CLASS_KEY,
- edgePlan.getEdgeDestination().getClassName());
-
- dagJson.accumulate(EDGES_KEY, edgeJson);
- }
-
return dagJson;
}
public static JSONObject convertCountersToJSON(TezCounters counters)
throws JSONException {
- JSONObject jsonObject = new JSONObject();
- if (counters == null) {
- return jsonObject;
- }
-
- for (CounterGroup group : counters) {
- JSONObject jsonCGrp = new JSONObject();
- jsonCGrp.put(ATSConstants.COUNTER_GROUP_NAME, group.getName());
- jsonCGrp.put(ATSConstants.COUNTER_GROUP_DISPLAY_NAME,
- group.getDisplayName());
- for (TezCounter counter : group) {
- JSONObject counterJson = new JSONObject();
- counterJson.put(ATSConstants.COUNTER_NAME, counter.getName());
- counterJson.put(ATSConstants.COUNTER_DISPLAY_NAME,
- counter.getDisplayName());
- counterJson.put(ATSConstants.COUNTER_VALUE, counter.getValue());
- jsonCGrp.accumulate(ATSConstants.COUNTERS, counterJson);
- }
- jsonObject.accumulate(ATSConstants.COUNTER_GROUPS, jsonCGrp);
- }
+ JSONObject jsonObject = new JSONObject(convertCountersToATSMap(counters));
return jsonObject;
}
@@ -185,44 +121,10 @@
}
public static Map<String,Object> convertDAGPlanToATSMap(
- DAGProtos.DAGPlan dagPlan) {
+ DAGProtos.DAGPlan dagPlan) throws IOException {
final String VERSION_KEY = "version";
final int version = 1;
- final String DAG_NAME_KEY = "dagName";
- final String VERTICES_KEY = "vertices";
- final String EDGES_KEY = "edges";
- final String VERTEX_GROUPS_KEY = "vertexGroups";
-
- final String VERTEX_NAME_KEY = "vertexName";
- final String PROCESSOR_CLASS_KEY = "processorClass";
- final String IN_EDGE_IDS_KEY = "inEdgeIds";
- final String OUT_EDGE_IDS_KEY = "outEdgeIds";
- final String ADDITIONAL_INPUTS_KEY = "additionalInputs";
- final String ADDITIONAL_OUTPUTS_KEY = "additionalOutputs";
- final String VERTEX_MANAGER_PLUGIN_CLASS_KEY =
- "vertexManagerPluginClass";
-
- final String EDGE_ID_KEY = "edgeId";
- final String INPUT_VERTEX_NAME_KEY = "inputVertexName";
- final String OUTPUT_VERTEX_NAME_KEY = "outputVertexName";
- final String DATA_MOVEMENT_TYPE_KEY = "dataMovementType";
- final String DATA_SOURCE_TYPE_KEY = "dataSourceType";
- final String SCHEDULING_TYPE_KEY = "schedulingType";
- final String EDGE_SOURCE_CLASS_KEY = "edgeSourceClass";
- final String EDGE_DESTINATION_CLASS_KEY =
- "edgeDestinationClass";
-
- final String NAME_KEY = "name";
- final String CLASS_KEY = "class";
- final String INITIALIZER_KEY = "initializer";
-
- final String VERTEX_GROUP_NAME_KEY = "groupName";
- final String VERTEX_GROUP_MEMBERS_KEY = "groupMembers";
- final String VERTEX_GROUP_OUTPUTS_KEY = "outputs";
- final String VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY = "edgeMergedInputs";
- final String VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY = "destinationVertexName";
-
Map<String,Object> dagMap = new LinkedHashMap<String, Object>();
dagMap.put(DAG_NAME_KEY, dagPlan.getName());
dagMap.put(VERSION_KEY, version);
@@ -234,6 +136,11 @@
if (vertexPlan.hasProcessorDescriptor()) {
vertexMap.put(PROCESSOR_CLASS_KEY,
vertexPlan.getProcessorDescriptor().getClassName());
+ if (vertexPlan.getProcessorDescriptor().hasHistoryText()) {
+ vertexMap.put(USER_PAYLOAD_AS_TEXT,
+ DagTypeConverters.getHistoryTextFromProto(
+ vertexPlan.getProcessorDescriptor()));
+ }
}
ArrayList<Object> inEdgeIdList = new ArrayList<Object>();
@@ -253,6 +160,11 @@
if (input.hasInitializerClassName()) {
inputMap.put(INITIALIZER_KEY, input.getInitializerClassName());
}
+ if (input.getEntityDescriptor().hasHistoryText()) {
+ inputMap.put(USER_PAYLOAD_AS_TEXT,
+ DagTypeConverters.getHistoryTextFromProto(
+ input.getEntityDescriptor()));
+ }
inputsList.add(inputMap);
}
putInto(vertexMap, ADDITIONAL_INPUTS_KEY, inputsList);
@@ -266,6 +178,11 @@
if (output.hasInitializerClassName()) {
outputMap.put(INITIALIZER_KEY, output.getInitializerClassName());
}
+ if (output.getEntityDescriptor().hasHistoryText()) {
+ outputMap.put(USER_PAYLOAD_AS_TEXT,
+ DagTypeConverters.getHistoryTextFromProto(
+ output.getEntityDescriptor()));
+ }
outputsList.add(outputMap);
}
putInto(vertexMap, ADDITIONAL_OUTPUTS_KEY, outputsList);
@@ -293,7 +210,16 @@
edgePlan.getEdgeSource().getClassName());
edgeMap.put(EDGE_DESTINATION_CLASS_KEY,
edgePlan.getEdgeDestination().getClassName());
-
+ if (edgePlan.getEdgeSource().hasHistoryText()) {
+ edgeMap.put(OUTPUT_USER_PAYLOAD_AS_TEXT,
+ DagTypeConverters.getHistoryTextFromProto(
+ edgePlan.getEdgeSource()));
+ }
+ if (edgePlan.getEdgeDestination().hasHistoryText()) {
+ edgeMap.put(INPUT_USER_PAYLOAD_AS_TEXT,
+ DagTypeConverters.getHistoryTextFromProto(
+ edgePlan.getEdgeDestination()));
+ }
edgesList.add(edgeMap);
}
putInto(dagMap, EDGES_KEY, edgesList);
@@ -321,6 +247,11 @@
&& edgeMergedInputInfo.getMergedInput().hasClassName()) {
edgeMergedInput.put(PROCESSOR_CLASS_KEY,
edgeMergedInputInfo.getMergedInput().getClassName());
+ if (edgeMergedInputInfo.getMergedInput().hasHistoryText()) {
+ edgeMergedInput.put(USER_PAYLOAD_AS_TEXT,
+ DagTypeConverters.getHistoryTextFromProto(
+ edgeMergedInputInfo.getMergedInput()));
+ }
}
edgeMergedInputs.add(edgeMergedInput);
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/JavaProfilerOptions.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/JavaProfilerOptions.java
index 6539944..37b49d8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/JavaProfilerOptions.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/JavaProfilerOptions.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.tez.dag.utils;
import java.util.BitSet;
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
index aae82f8..bcd65e4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
@@ -32,7 +32,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.TezException;
import com.google.common.collect.Lists;
@@ -55,7 +55,7 @@
}
public static void addUrlsToClassPath(List<URL> urls) {
- RuntimeUtils.addResourcesToSystemClassLoader(urls);
+ ReflectionUtils.addResourcesToSystemClassLoader(urls);
}
private static Path downloadResource(String destName, URI uri, Configuration conf)
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
index 8b888ff..1146ce4 100644
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/ErrorReporter.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.tez.runtime.task;
public interface ErrorReporter {
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
index 009df3c..4fb563c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.tez.dag.api.client;
import static org.junit.Assert.*;
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index ed91e20..5c79f22 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -25,7 +25,7 @@
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
import org.apache.tez.dag.api.VertexLocationHint;
@@ -76,7 +76,7 @@
event.toProtoStream(os);
os.flush();
os.close();
- deserializedEvent = RuntimeUtils.createClazzInstance(
+ deserializedEvent = ReflectionUtils.createClazzInstance(
event.getClass().getName());
LOG.info("Serialized event to byte array"
+ ", eventType=" + event.getEventType()
@@ -100,7 +100,7 @@
SummaryEventProto summaryEventProto =
SummaryEventProto.parseDelimitedFrom(
new ByteArrayInputStream(os.toByteArray()));
- deserializedEvent = RuntimeUtils.createClazzInstance(
+ deserializedEvent = ReflectionUtils.createClazzInstance(
event.getClass().getName());
((SummaryEvent)deserializedEvent).fromSummaryProtoStream(summaryEventProto);
return deserializedEvent;
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
new file mode 100644
index 0000000..f926471
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.utils;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.GroupInputEdge;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.runtime.api.OutputCommitter;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class TestDAGUtils {
+
+ private DAGPlan createDAG() {
+ // Create a plan with 3 vertices: A, B, C. Group(A,B)->C
+ Configuration conf = new Configuration(false);
+ int dummyTaskCount = 1;
+ Resource dummyTaskResource = Resource.newInstance(1, 1);
+ org.apache.tez.dag.api.Vertex v1 = new org.apache.tez.dag.api.Vertex("vertex1",
+ new ProcessorDescriptor("Processor").setHistoryText("vertex1 Processor HistoryText"),
+ dummyTaskCount, dummyTaskResource);
+ v1.addInput("input1", new InputDescriptor("input.class").setHistoryText("input HistoryText"),
+ null);
+ org.apache.tez.dag.api.Vertex v2 = new org.apache.tez.dag.api.Vertex("vertex2",
+ new ProcessorDescriptor("Processor").setHistoryText("vertex2 Processor HistoryText"),
+ dummyTaskCount, dummyTaskResource);
+ org.apache.tez.dag.api.Vertex v3 = new org.apache.tez.dag.api.Vertex("vertex3",
+ new ProcessorDescriptor("Processor").setHistoryText("vertex3 Processor HistoryText"),
+ dummyTaskCount, dummyTaskResource);
+
+ DAG dag = new DAG("testDag");
+ String groupName1 = "uv12";
+ org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
+ OutputDescriptor outDesc = new OutputDescriptor("output.class")
+ .setHistoryText("uvOut HistoryText");
+ uv12.addOutput("uvOut", outDesc, OutputCommitter.class);
+ v3.addOutput("uvOut", outDesc, OutputCommitter.class);
+
+ GroupInputEdge e1 = new GroupInputEdge(uv12, v3,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("dummy output class").setHistoryText("Dummy History Text"),
+ new InputDescriptor("dummy input class").setHistoryText("Dummy History Text")),
+ new InputDescriptor("merge.class").setHistoryText("Merge HistoryText"));
+
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addVertex(v3);
+ dag.addEdge(e1);
+ return dag.createDag(conf);
+ }
+
+ @Test
+ public void testConvertDAGPlanToATSMap() throws IOException, JSONException {
+ DAGPlan dagPlan = createDAG();
+ Map<String, Object> atsMap = DAGUtils.convertDAGPlanToATSMap(dagPlan);
+ Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_NAME_KEY));
+ Assert.assertEquals(dagPlan.getName(), atsMap.get(DAGUtils.DAG_NAME_KEY));
+ Assert.assertTrue(atsMap.containsKey("version"));
+ Assert.assertEquals(1, atsMap.get("version"));
+ Assert.assertTrue(atsMap.containsKey(DAGUtils.VERTICES_KEY));
+ Assert.assertTrue(atsMap.containsKey(DAGUtils.EDGES_KEY));
+ Assert.assertTrue(atsMap.containsKey(DAGUtils.VERTEX_GROUPS_KEY));
+
+ Assert.assertEquals(3, ((Collection<?>) atsMap.get(DAGUtils.VERTICES_KEY)).size());
+ Set<String> vNames = Sets.newHashSet("vertex1", "vertex2", "vertex3");
+
+ Set<String> inEdgeIds = new HashSet<String>();
+ Set<String> outEdgeIds = new HashSet<String>();
+
+ int additionalInputCount = 0;
+ int additionalOutputCount = 0;
+
+ for (Object o : ((Collection<?>) atsMap.get(DAGUtils.VERTICES_KEY))) {
+ Map<String, Object> v = (Map<String, Object>) o;
+ Assert.assertTrue(v.containsKey(DAGUtils.VERTEX_NAME_KEY));
+ Assert.assertTrue(v.containsKey(DAGUtils.PROCESSOR_CLASS_KEY));
+ Assert.assertTrue(v.containsKey(DAGUtils.USER_PAYLOAD_AS_TEXT));
+
+ if (v.containsKey(DAGUtils.IN_EDGE_IDS_KEY)) {
+ inEdgeIds.addAll(((Collection<String>) v.get(DAGUtils.IN_EDGE_IDS_KEY)));
+ }
+ if (v.containsKey(DAGUtils.OUT_EDGE_IDS_KEY)) {
+ outEdgeIds.addAll(((Collection<String>) v.get(DAGUtils.OUT_EDGE_IDS_KEY)));
+ }
+
+ String vName = (String) v.get(DAGUtils.VERTEX_NAME_KEY);
+ Assert.assertTrue(vNames.contains(vName));
+ String procPayload = vName + " Processor HistoryText";
+ Assert.assertEquals(procPayload, v.get(DAGUtils.USER_PAYLOAD_AS_TEXT));
+
+ if (v.containsKey(DAGUtils.ADDITIONAL_INPUTS_KEY)) {
+ additionalInputCount += ((Collection<?>) v.get(DAGUtils.ADDITIONAL_INPUTS_KEY)).size();
+ for (Object input : ((Collection<?>) v.get(DAGUtils.ADDITIONAL_INPUTS_KEY))) {
+ Map<String, Object> inputMap = (Map<String, Object>) input;
+ Assert.assertTrue(inputMap.containsKey(DAGUtils.NAME_KEY));
+ Assert.assertTrue(inputMap.containsKey(DAGUtils.CLASS_KEY));
+ Assert.assertFalse(inputMap.containsKey(DAGUtils.INITIALIZER_KEY));
+ Assert.assertEquals("input HistoryText", inputMap.get(DAGUtils.USER_PAYLOAD_AS_TEXT));
+ }
+ }
+
+ if (v.containsKey(DAGUtils.ADDITIONAL_OUTPUTS_KEY)) {
+ additionalOutputCount += ((Collection<?>) v.get(DAGUtils.ADDITIONAL_OUTPUTS_KEY)).size();
+ for (Object output : ((Collection<?>) v.get(DAGUtils.ADDITIONAL_OUTPUTS_KEY))) {
+ Map<String, Object> outputMap = (Map<String, Object>) output;
+ Assert.assertTrue(outputMap.containsKey(DAGUtils.NAME_KEY));
+ Assert.assertTrue(outputMap.containsKey(DAGUtils.CLASS_KEY));
+ Assert.assertTrue(outputMap.containsKey(DAGUtils.INITIALIZER_KEY));
+ Assert.assertEquals("uvOut HistoryText", outputMap.get(DAGUtils.USER_PAYLOAD_AS_TEXT));
+ }
+ }
+ }
+
+ // 1 input
+ Assert.assertEquals(1, additionalInputCount);
+ // 3 outputs due to vertex group
+ Assert.assertEquals(3, additionalOutputCount);
+
+ // 1 edge translates to 2 due to vertex group
+ Assert.assertEquals(2, inEdgeIds.size());
+ Assert.assertEquals(2, outEdgeIds.size());
+
+ for (Object o : ((Collection<?>) atsMap.get(DAGUtils.EDGES_KEY))) {
+ Map<String, Object> e = (Map<String, Object>) o;
+
+ Assert.assertTrue(inEdgeIds.contains(e.get(DAGUtils.EDGE_ID_KEY)));
+ Assert.assertTrue(outEdgeIds.contains(e.get(DAGUtils.EDGE_ID_KEY)));
+ Assert.assertTrue(e.containsKey(DAGUtils.INPUT_VERTEX_NAME_KEY));
+ Assert.assertTrue(e.containsKey(DAGUtils.OUTPUT_VERTEX_NAME_KEY));
+ Assert.assertEquals(DataMovementType.SCATTER_GATHER.name(),
+ e.get(DAGUtils.DATA_MOVEMENT_TYPE_KEY));
+ Assert.assertEquals(DataSourceType.PERSISTED.name(), e.get(DAGUtils.DATA_SOURCE_TYPE_KEY));
+ Assert.assertEquals(SchedulingType.SEQUENTIAL.name(), e.get(DAGUtils.SCHEDULING_TYPE_KEY));
+ Assert.assertEquals("dummy output class", e.get(DAGUtils.EDGE_SOURCE_CLASS_KEY));
+ Assert.assertEquals("dummy input class", e.get(DAGUtils.EDGE_DESTINATION_CLASS_KEY));
+ Assert.assertEquals("Dummy History Text", e.get(DAGUtils.OUTPUT_USER_PAYLOAD_AS_TEXT));
+ Assert.assertEquals("Dummy History Text", e.get(DAGUtils.INPUT_USER_PAYLOAD_AS_TEXT));
+ }
+
+ for (Object o : ((Collection<?>) atsMap.get(DAGUtils.VERTEX_GROUPS_KEY))) {
+ Map<String, Object> e = (Map<String, Object>) o;
+ Assert.assertEquals("uv12", e.get(DAGUtils.VERTEX_GROUP_NAME_KEY));
+ Assert.assertTrue(e.containsKey(DAGUtils.VERTEX_GROUP_MEMBERS_KEY));
+ Assert.assertTrue(e.containsKey(DAGUtils.VERTEX_GROUP_OUTPUTS_KEY));
+ Assert.assertTrue(e.containsKey(DAGUtils.VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY));
+ }
+ }
+
+}
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 1c6d45e..051bfee 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -18,6 +18,7 @@
package org.apache.tez.mapreduce.examples;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
@@ -198,12 +199,16 @@
List<Vertex> vertices = new ArrayList<Vertex>();
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream(4096);
+ mapStageConf.writeXml(outputStream);
+ String mapStageHistoryText = new String(outputStream.toByteArray(), "UTF-8");
byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
byte[] mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload,
TextInputFormat.class.getName());
int numMaps = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
- MapProcessor.class.getName()).setUserPayload(mapPayload),
+ MapProcessor.class.getName()).setUserPayload(mapPayload)
+ .setHistoryText(mapStageHistoryText),
numMaps, MRHelpers.getMapResource(mapStageConf));
if (generateSplitsInClient) {
mapVertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
@@ -222,19 +227,27 @@
MRHelpers.addMRInput(mapVertex, mapInputPayload, initializerClazz);
vertices.add(mapVertex);
+ ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096);
+ iReduceStageConf.writeXml(iROutputStream);
+ String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8");
Vertex ivertex = new Vertex("intermediate_reducer", new ProcessorDescriptor(
- ReduceProcessor.class.getName()).
- setUserPayload(MRHelpers.createUserPayloadFromConf(iReduceStageConf)),
- 2,
- MRHelpers.getReduceResource(iReduceStageConf));
+ ReduceProcessor.class.getName())
+ .setUserPayload(MRHelpers.createUserPayloadFromConf(iReduceStageConf))
+ .setHistoryText(iReduceStageHistoryText),
+ 2, MRHelpers.getReduceResource(iReduceStageConf));
ivertex.setTaskLocalFiles(commonLocalResources);
vertices.add(ivertex);
+ ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096);
+ finalReduceConf.writeXml(finalReduceOutputStream);
+ String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8");
byte[] finalReducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
Vertex finalReduceVertex = new Vertex("finalreduce",
new ProcessorDescriptor(
- ReduceProcessor.class.getName()).setUserPayload(finalReducePayload),
- 1, MRHelpers.getReduceResource(finalReduceConf));
+ ReduceProcessor.class.getName())
+ .setUserPayload(finalReducePayload)
+ .setHistoryText(finalReduceStageHistoryText), 1,
+ MRHelpers.getReduceResource(finalReduceConf));
finalReduceVertex.setTaskLocalFiles(commonLocalResources);
MRHelpers.addMROutputLegacy(finalReduceVertex, finalReducePayload);
vertices.add(finalReduceVertex);
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
index 6e94c22..76da547 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
@@ -29,8 +29,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.TezUncheckedException;
import com.google.common.base.Preconditions;
@@ -85,7 +84,7 @@
Class<? extends InputFormat> clazz = (Class<? extends InputFormat>)
getClassFromName(split.wrappedInputFormatName);
try {
- wrappedInputFormat = ReflectionUtils.newInstance(clazz, conf);
+ wrappedInputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf);
} catch (Exception e) {
throw new TezUncheckedException(e);
}
@@ -93,7 +92,7 @@
}
static Class<?> getClassFromName(String name) {
- return RuntimeUtils.getClazz(name);
+ return ReflectionUtils.getClazz(name);
}
public class TezGroupedSplitsRecordReader implements RecordReader<K, V> {
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
index 5fa3e79..3fd0b6e 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java
@@ -31,8 +31,7 @@
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.TezUncheckedException;
import com.google.common.base.Preconditions;
@@ -120,7 +119,7 @@
Class<? extends InputFormat> clazz = (Class<? extends InputFormat>)
getClassFromName(split.wrappedInputFormatName);
try {
- wrappedInputFormat = ReflectionUtils.newInstance(clazz, conf);
+ wrappedInputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf);
} catch (Exception e) {
throw new TezUncheckedException(e);
}
@@ -128,7 +127,7 @@
}
static Class<?> getClassFromName(String name) {
- return RuntimeUtils.getClazz(name);
+ return ReflectionUtils.getClazz(name);
}
public class TezGroupedSplitsRecordReader extends RecordReader<K, V> {
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 70f56d4..f078f1d 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -18,8 +18,11 @@
package org.apache.tez.dag.history.logging.ats;
+import java.io.IOException;
+
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.AMLaunchedEvent;
@@ -277,8 +280,12 @@
atsEntity.addPrimaryFilter(ATSConstants.USER, event.getUser());
atsEntity.addPrimaryFilter(ATSConstants.DAG_NAME, event.getDAGName());
- atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
- DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
+ try {
+ atsEntity.addOtherInfo(ATSConstants.DAG_PLAN,
+ DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan()));
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
atsEntity.addOtherInfo(ATSConstants.APPLICATION_ID,
event.getApplicationAttemptId().getApplicationId().toString());
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 881ae90..122cc23 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -42,7 +42,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
@@ -503,7 +503,7 @@
}
private LogicalInput createInputFromDescriptor(InputDescriptor inputDesc) {
- Input input = RuntimeUtils.createClazzInstance(inputDesc.getClassName());
+ Input input = ReflectionUtils.createClazzInstance(inputDesc.getClassName());
if (!(input instanceof LogicalInput)) {
throw new TezUncheckedException(inputDesc.getClass().getName()
+ " is not a sub-type of LogicalInput."
@@ -514,7 +514,7 @@
private LogicalOutput createOutput(OutputSpec outputSpec) {
LOG.info("Creating Output");
- Output output = RuntimeUtils.createClazzInstance(outputSpec
+ Output output = ReflectionUtils.createClazzInstance(outputSpec
.getOutputDescriptor().getClassName());
if (!(output instanceof LogicalOutput)) {
throw new TezUncheckedException(output.getClass().getName()
@@ -526,7 +526,7 @@
private LogicalIOProcessor createProcessor(
ProcessorDescriptor processorDescriptor) {
- Processor processor = RuntimeUtils.createClazzInstance(processorDescriptor
+ Processor processor = ReflectionUtils.createClazzInstance(processorDescriptor
.getClassName());
if (!(processor instanceof LogicalIOProcessor)) {
throw new TezUncheckedException(processor.getClass().getName()
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
index c024d0a..6d2f852 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
@@ -30,7 +30,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.dag.api.TezEntityDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
@@ -65,9 +65,9 @@
private final List<RequestorInfo> requestList;
/**
- * @param numInputs
+ * @param numTotalInputs
* total number of Inputs for the task
- * @param numOutputs
+ * @param numTotalOutputs
* total number of Outputs for the task
* @param conf
* Tez specific task configuration
@@ -122,7 +122,7 @@
String allocatorClassName = conf.get(TezJobConfig.TEZ_RUNTIME_SCALE_TASK_MEMORY_ALLOCATOR_CLASS,
TezJobConfig.TEZ_RUNTIME_SCALE_TASK_MEMORY_ALLOCATOR_CLASS_DEFAULT);
LOG.info("Using Allocator class: " + allocatorClassName);
- InitialMemoryAllocator allocator = RuntimeUtils.createClazzInstance(allocatorClassName);
+ InitialMemoryAllocator allocator = ReflectionUtils.createClazzInstance(allocatorClassName);
allocator.setConf(conf);
allocations = allocator.assignMemory(totalJvmMemory, numTotalInputs, numTotalOutputs,
Iterables.unmodifiableIterable(requestContexts));
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestRuntimeUtils.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestReflectionUtils.java
similarity index 92%
rename from tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestRuntimeUtils.java
rename to tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestReflectionUtils.java
index f374cc0..d10bd29 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestRuntimeUtils.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestReflectionUtils.java
@@ -29,11 +29,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.TezException;
import org.junit.Test;
-public class TestRuntimeUtils {
+public class TestReflectionUtils {
@Test
public void testAddResourceToClasspath() throws IOException, TezException {
@@ -57,7 +57,7 @@
urlForm = urlForm.substring(0, urlForm.lastIndexOf('/') + 1);
URL url = new URL(urlForm);
- RuntimeUtils.addResourcesToClasspath(Collections.singletonList(url));
+ ReflectionUtils.addResourcesToClasspath(Collections.singletonList(url));
loadedUrl = Thread.currentThread().getContextClassLoader().getResource(rsrcName);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index e37bbe3..ac88865 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -25,6 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
@@ -90,7 +91,7 @@
}
if (shufflePayload.hasEmptyPartitions()) {
try {
- byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions());
+ byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions());
BitSet emptyPartitionsBitSet = TezUtils.fromByteArray(emptyPartitions);
if (emptyPartitionsBitSet.get(partitionId)) {
LOG.info("Source partition: " + partitionId + " did not generate any data. SrcAttempt: ["
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 2d51a7d..4e3e94b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -48,6 +48,7 @@
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
@@ -477,7 +478,7 @@
}
if (emptyPartitions.cardinality() != 0) {
// Empty partitions exist
- ByteString emptyPartitionsByteString = TezUtils.compressByteArrayToByteString(TezUtils
+ ByteString emptyPartitionsByteString = TezCommonUtils.compressByteArrayToByteString(TezUtils
.toByteArray(emptyPartitions));
payloadBuidler.setEmptyPartitions(emptyPartitionsByteString);
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 8fbac92..c1f5ddb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtils;
@@ -165,7 +166,7 @@
}
if (emptyPartitions > 0) {
ByteString emptyPartitionsBytesString =
- TezUtils.compressByteArrayToByteString(TezUtils.toByteArray(emptyPartitionDetails));
+ TezCommonUtils.compressByteArrayToByteString(TezUtils.toByteArray(emptyPartitionDetails));
payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
LOG.info("EmptyPartition bitsetSize=" + emptyPartitionDetails.cardinality() + ", numOutputs="
+ getNumPhysicalOutputs() + ", emptyPartitions=" + emptyPartitions
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index 646ddbd..8cd47c1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtils;
@@ -134,7 +135,7 @@
BitSet emptyPartitions = new BitSet();
emptyPartitions.set(0);
ByteString emptyPartitionsBytesString =
- TezUtils.compressByteArrayToByteString(TezUtils.toByteArray(emptyPartitions));
+ TezCommonUtils.compressByteArrayToByteString(TezUtils.toByteArray(emptyPartitions));
payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
}
if (outputGenerated) {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
index 628298a..65dba32 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
@@ -26,6 +26,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
@@ -101,7 +102,7 @@
+ stringify(shufflePayload));
if (shufflePayload.hasEmptyPartitions()) {
- byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(shufflePayload
+ byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload
.getEmptyPartitions());
BitSet emptyPartionsBitSet = TezUtils.fromByteArray(emptyPartitions);
if (emptyPartionsBitSet.get(srcIndex)) {
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index f45a7d4..5c3a83a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -26,7 +26,7 @@
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeManager;
import org.apache.tez.dag.api.EdgeManagerContext;
@@ -125,7 +125,7 @@
newEdgeManagers.clear();
for (Entry<String, EdgeManagerDescriptor> entry :
((Map<String, EdgeManagerDescriptor>)invocation.getArguments()[2]).entrySet()) {
- EdgeManager edgeManager = RuntimeUtils.createClazzInstance(
+ EdgeManager edgeManager = ReflectionUtils.createClazzInstance(
entry.getValue().getClassName());
final byte[] userPayload = entry.getValue().getUserPayload();
edgeManager.initialize(new EdgeManagerContext() {
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index f5fd276..9b37c78 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -57,6 +57,7 @@
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtils;
@@ -316,7 +317,7 @@
BitSet emptyPartitionBits = null;
if (partitionsWithData.cardinality() != numPartitions) {
assertTrue(eventProto.hasEmptyPartitions());
- byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(eventProto
+ byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(eventProto
.getEmptyPartitions());
emptyPartitionBits = TezUtils.fromByteArray(emptyPartitions);
assertEquals(numPartitions - partitionsWithData.cardinality(),
@@ -505,7 +506,7 @@
emptyPartitionBits = new BitSet(numPartitions);
} else {
assertTrue(eventProto.hasEmptyPartitions());
- byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(eventProto
+ byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(eventProto
.getEmptyPartitions());
emptyPartitionBits = TezUtils.fromByteArray(emptyPartitions);
if (numRecordsWritten == 0) {
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java
index e95c30d..2aa7396 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestShuffleInputEventHandlerImpl.java
@@ -27,6 +27,7 @@
import java.util.LinkedList;
import java.util.List;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TezInputContext;
@@ -151,7 +152,7 @@
for (int i : emptyPartitions) {
bitSet.set(i);
}
- ByteString emptyPartitionsBytesString = TezUtils.compressByteArrayToByteString(TezUtils
+ ByteString emptyPartitionsBytesString = TezCommonUtils.compressByteArrayToByteString(TezUtils
.toByteArray(bitSet));
return emptyPartitionsBytesString;
}
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index e5667c7..ea0378d 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -67,7 +67,7 @@
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezClient;
import org.apache.tez.client.TezAppMasterStatus;
-import org.apache.tez.common.RuntimeUtils;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
@@ -671,7 +671,7 @@
.getConfigurationBytes());
try {
- RuntimeUtils.getClazz(RELOCALIZATION_TEST_CLASS_NAME);
+ ReflectionUtils.getClazz(RELOCALIZATION_TEST_CLASS_NAME);
LOG.info("Class found");
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(new Path("/tmp/relocalizationfilefound"));