blob: b58ee9b6f2f46e03fe10a0ba9180ba9e6b746a80 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.history.utils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.apache.tez.dag.records.TezTaskID;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
public class DAGUtils {
static final String DAG_NAME_KEY = "dagName";
static final String VERTICES_KEY = "vertices";
static final String EDGES_KEY = "edges";
static final String VERTEX_GROUPS_KEY = "vertexGroups";
static final String VERTEX_NAME_KEY = "vertexName";
static final String PROCESSOR_CLASS_KEY = "processorClass";
static final String IN_EDGE_IDS_KEY = "inEdgeIds";
static final String OUT_EDGE_IDS_KEY = "outEdgeIds";
static final String ADDITIONAL_INPUTS_KEY = "additionalInputs";
static final String ADDITIONAL_OUTPUTS_KEY = "additionalOutputs";
static final String VERTEX_MANAGER_PLUGIN_CLASS_KEY =
"vertexManagerPluginClass";
static final String USER_PAYLOAD_AS_TEXT = "userPayloadAsText";
static final String OUTPUT_USER_PAYLOAD_AS_TEXT = "outputUserPayloadAsText";
static final String INPUT_USER_PAYLOAD_AS_TEXT = "inputUserPayloadAsText";
static final String EDGE_ID_KEY = "edgeId";
static final String INPUT_VERTEX_NAME_KEY = "inputVertexName";
static final String OUTPUT_VERTEX_NAME_KEY = "outputVertexName";
static final String DATA_MOVEMENT_TYPE_KEY = "dataMovementType";
static final String DATA_SOURCE_TYPE_KEY = "dataSourceType";
static final String SCHEDULING_TYPE_KEY = "schedulingType";
static final String EDGE_SOURCE_CLASS_KEY = "edgeSourceClass";
static final String EDGE_DESTINATION_CLASS_KEY =
"edgeDestinationClass";
static final String NAME_KEY = "name";
static final String CLASS_KEY = "class";
static final String INITIALIZER_KEY = "initializer";
static final String VERTEX_GROUP_NAME_KEY = "groupName";
static final String VERTEX_GROUP_MEMBERS_KEY = "groupMembers";
static final String VERTEX_GROUP_OUTPUTS_KEY = "outputs";
static final String VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY = "edgeMergedInputs";
static final String VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY = "destinationVertexName";
public static JSONObject generateSimpleJSONPlan(DAGProtos.DAGPlan dagPlan) throws JSONException {
JSONObject dagJson;
try {
dagJson = new JSONObject(convertDAGPlanToATSMap(dagPlan));
} catch (IOException e) {
throw new TezUncheckedException(e);
}
return dagJson;
}
public static JSONObject convertCountersToJSON(TezCounters counters)
throws JSONException {
JSONObject jsonObject = new JSONObject(convertCountersToATSMap(counters));
return jsonObject;
}
public static Map<String,Object> convertCountersToATSMap(TezCounters counters) {
Map<String,Object> object = new LinkedHashMap<String, Object>();
if (counters == null) {
return object;
}
ArrayList<Object> counterGroupsList = new ArrayList<Object>();
for (CounterGroup group : counters) {
Map<String,Object> counterGroupMap = new LinkedHashMap<String, Object>();
counterGroupMap.put(ATSConstants.COUNTER_GROUP_NAME, group.getName());
counterGroupMap.put(ATSConstants.COUNTER_GROUP_DISPLAY_NAME,
group.getDisplayName());
ArrayList<Object> counterList = new ArrayList<Object>();
for (TezCounter counter : group) {
Map<String,Object> counterMap = new LinkedHashMap<String, Object>();
counterMap.put(ATSConstants.COUNTER_NAME, counter.getName());
counterMap.put(ATSConstants.COUNTER_DISPLAY_NAME,
counter.getDisplayName());
counterMap.put(ATSConstants.COUNTER_VALUE, counter.getValue());
counterList.add(counterMap);
}
putInto(counterGroupMap, ATSConstants.COUNTERS, counterList);
counterGroupsList.add(counterGroupMap);
}
putInto(object, ATSConstants.COUNTER_GROUPS, counterGroupsList);
return object;
}
public static Map<String,Object> convertDAGPlanToATSMap(
DAGProtos.DAGPlan dagPlan) throws IOException {
final String VERSION_KEY = "version";
final int version = 1;
Map<String,Object> dagMap = new LinkedHashMap<String, Object>();
dagMap.put(DAG_NAME_KEY, dagPlan.getName());
dagMap.put(VERSION_KEY, version);
ArrayList<Object> verticesList = new ArrayList<Object>();
for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) {
Map<String,Object> vertexMap = new LinkedHashMap<String, Object>();
vertexMap.put(VERTEX_NAME_KEY, vertexPlan.getName());
if (vertexPlan.hasProcessorDescriptor()) {
vertexMap.put(PROCESSOR_CLASS_KEY,
vertexPlan.getProcessorDescriptor().getClassName());
if (vertexPlan.getProcessorDescriptor().hasHistoryText()) {
vertexMap.put(USER_PAYLOAD_AS_TEXT,
DagTypeConverters.getHistoryTextFromProto(
vertexPlan.getProcessorDescriptor()));
}
}
ArrayList<Object> inEdgeIdList = new ArrayList<Object>();
inEdgeIdList.addAll(vertexPlan.getInEdgeIdList());
putInto(vertexMap, IN_EDGE_IDS_KEY, inEdgeIdList);
ArrayList<Object> outEdgeIdList = new ArrayList<Object>();
outEdgeIdList.addAll(vertexPlan.getOutEdgeIdList());
putInto(vertexMap, OUT_EDGE_IDS_KEY, outEdgeIdList);
ArrayList<Object> inputsList = new ArrayList<Object>();
for (DAGProtos.RootInputLeafOutputProto input :
vertexPlan.getInputsList()) {
Map<String,Object> inputMap = new LinkedHashMap<String, Object>();
inputMap.put(NAME_KEY, input.getName());
inputMap.put(CLASS_KEY, input.getEntityDescriptor().getClassName());
if (input.hasInitializerClassName()) {
inputMap.put(INITIALIZER_KEY, input.getInitializerClassName());
}
if (input.getEntityDescriptor().hasHistoryText()) {
inputMap.put(USER_PAYLOAD_AS_TEXT,
DagTypeConverters.getHistoryTextFromProto(
input.getEntityDescriptor()));
}
inputsList.add(inputMap);
}
putInto(vertexMap, ADDITIONAL_INPUTS_KEY, inputsList);
ArrayList<Object> outputsList = new ArrayList<Object>();
for (DAGProtos.RootInputLeafOutputProto output :
vertexPlan.getOutputsList()) {
Map<String,Object> outputMap = new LinkedHashMap<String, Object>();
outputMap.put(NAME_KEY, output.getName());
outputMap.put(CLASS_KEY, output.getEntityDescriptor().getClassName());
if (output.hasInitializerClassName()) {
outputMap.put(INITIALIZER_KEY, output.getInitializerClassName());
}
if (output.getEntityDescriptor().hasHistoryText()) {
outputMap.put(USER_PAYLOAD_AS_TEXT,
DagTypeConverters.getHistoryTextFromProto(
output.getEntityDescriptor()));
}
outputsList.add(outputMap);
}
putInto(vertexMap, ADDITIONAL_OUTPUTS_KEY, outputsList);
if (vertexPlan.hasVertexManagerPlugin()) {
vertexMap.put(VERTEX_MANAGER_PLUGIN_CLASS_KEY,
vertexPlan.getVertexManagerPlugin().getClassName());
}
verticesList.add(vertexMap);
}
putInto(dagMap, VERTICES_KEY, verticesList);
ArrayList<Object> edgesList = new ArrayList<Object>();
for (DAGProtos.EdgePlan edgePlan : dagPlan.getEdgeList()) {
Map<String,Object> edgeMap = new LinkedHashMap<String, Object>();
edgeMap.put(EDGE_ID_KEY, edgePlan.getId());
edgeMap.put(INPUT_VERTEX_NAME_KEY, edgePlan.getInputVertexName());
edgeMap.put(OUTPUT_VERTEX_NAME_KEY, edgePlan.getOutputVertexName());
edgeMap.put(DATA_MOVEMENT_TYPE_KEY,
edgePlan.getDataMovementType().name());
edgeMap.put(DATA_SOURCE_TYPE_KEY, edgePlan.getDataSourceType().name());
edgeMap.put(SCHEDULING_TYPE_KEY, edgePlan.getSchedulingType().name());
edgeMap.put(EDGE_SOURCE_CLASS_KEY,
edgePlan.getEdgeSource().getClassName());
edgeMap.put(EDGE_DESTINATION_CLASS_KEY,
edgePlan.getEdgeDestination().getClassName());
if (edgePlan.getEdgeSource().hasHistoryText()) {
edgeMap.put(OUTPUT_USER_PAYLOAD_AS_TEXT,
DagTypeConverters.getHistoryTextFromProto(
edgePlan.getEdgeSource()));
}
if (edgePlan.getEdgeDestination().hasHistoryText()) {
edgeMap.put(INPUT_USER_PAYLOAD_AS_TEXT,
DagTypeConverters.getHistoryTextFromProto(
edgePlan.getEdgeDestination()));
}
edgesList.add(edgeMap);
}
putInto(dagMap, EDGES_KEY, edgesList);
ArrayList<Object> vertexGroupsList = new ArrayList<Object>();
for (DAGProtos.PlanVertexGroupInfo vertexGroupInfo :
dagPlan.getVertexGroupsList()) {
Map<String,Object> groupMap = new LinkedHashMap<String, Object>();
groupMap.put(VERTEX_GROUP_NAME_KEY, vertexGroupInfo.getGroupName());
if (vertexGroupInfo.getGroupMembersCount() > 0 ) {
groupMap.put(VERTEX_GROUP_MEMBERS_KEY, vertexGroupInfo.getGroupMembersList());
}
if (vertexGroupInfo.getOutputsCount() > 0) {
groupMap.put(VERTEX_GROUP_OUTPUTS_KEY, vertexGroupInfo.getOutputsList());
}
if (vertexGroupInfo.getEdgeMergedInputsCount() > 0) {
ArrayList<Object> edgeMergedInputs = new ArrayList<Object>();
for (PlanGroupInputEdgeInfo edgeMergedInputInfo :
vertexGroupInfo.getEdgeMergedInputsList()) {
Map<String,Object> edgeMergedInput = new LinkedHashMap<String, Object>();
edgeMergedInput.put(VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY,
edgeMergedInputInfo.getDestVertexName());
if (edgeMergedInputInfo.hasMergedInput()
&& edgeMergedInputInfo.getMergedInput().hasClassName()) {
edgeMergedInput.put(PROCESSOR_CLASS_KEY,
edgeMergedInputInfo.getMergedInput().getClassName());
if (edgeMergedInputInfo.getMergedInput().hasHistoryText()) {
edgeMergedInput.put(USER_PAYLOAD_AS_TEXT,
DagTypeConverters.getHistoryTextFromProto(
edgeMergedInputInfo.getMergedInput()));
}
}
edgeMergedInputs.add(edgeMergedInput);
}
groupMap.put(VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY, edgeMergedInputs);
}
vertexGroupsList.add(groupMap);
}
putInto(dagMap, VERTEX_GROUPS_KEY, vertexGroupsList);
return dagMap;
}
private static void putInto(Map<String, Object> map, String key,
ArrayList<Object> list) {
if (list.isEmpty()) {
return;
}
map.put(key, list);
}
private static ArrayList<String> convertToStringArrayList(
Collection<TezTaskID> collection) {
ArrayList<String> list = new ArrayList<String>(collection.size());
for (TezTaskID t : collection) {
list.add(t.toString());
}
return list;
}
public static Map<String,Object> convertVertexStatsToATSMap(
VertexStats vertexStats) {
Map<String,Object> vertexStatsMap = new LinkedHashMap<String, Object>();
if (vertexStats == null) {
return vertexStatsMap;
}
final String FIRST_TASK_START_TIME_KEY = "firstTaskStartTime";
final String FIRST_TASKS_TO_START_KEY = "firstTasksToStart";
final String LAST_TASK_FINISH_TIME_KEY = "lastTaskFinishTime";
final String LAST_TASKS_TO_FINISH_KEY = "lastTasksToFinish";
final String MIN_TASK_DURATION = "minTaskDuration";
final String MAX_TASK_DURATION = "maxTaskDuration";
final String AVG_TASK_DURATION = "avgTaskDuration";
final String SHORTEST_DURATION_TASKS = "shortestDurationTasks";
final String LONGEST_DURATION_TASKS = "longestDurationTasks";
vertexStatsMap.put(FIRST_TASK_START_TIME_KEY, vertexStats.getFirstTaskStartTime());
if (vertexStats.getFirstTasksToStart() != null
&& !vertexStats.getFirstTasksToStart().isEmpty()) {
vertexStatsMap.put(FIRST_TASKS_TO_START_KEY,
convertToStringArrayList(vertexStats.getFirstTasksToStart()));
}
vertexStatsMap.put(LAST_TASK_FINISH_TIME_KEY, vertexStats.getLastTaskFinishTime());
if (vertexStats.getLastTasksToFinish() != null
&& !vertexStats.getLastTasksToFinish().isEmpty()) {
vertexStatsMap.put(LAST_TASKS_TO_FINISH_KEY,
convertToStringArrayList(vertexStats.getLastTasksToFinish()));
}
vertexStatsMap.put(MIN_TASK_DURATION, vertexStats.getMinTaskDuration());
vertexStatsMap.put(MAX_TASK_DURATION, vertexStats.getMaxTaskDuration());
vertexStatsMap.put(AVG_TASK_DURATION, vertexStats.getAvgTaskDuration());
if (vertexStats.getShortestDurationTasks() != null
&& !vertexStats.getShortestDurationTasks().isEmpty()) {
vertexStatsMap.put(SHORTEST_DURATION_TASKS,
convertToStringArrayList(vertexStats.getShortestDurationTasks()));
}
if (vertexStats.getLongestDurationTasks() != null
&& !vertexStats.getLongestDurationTasks().isEmpty()) {
vertexStatsMap.put(LONGEST_DURATION_TASKS,
convertToStringArrayList(vertexStats.getLongestDurationTasks()));
}
return vertexStatsMap;
}
}