| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.history.parser.datamodel; |
| |
| import org.apache.tez.common.Preconditions; |
| import com.google.common.base.Predicate; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.LinkedHashMultimap; |
| import com.google.common.collect.LinkedListMultimap; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Multimap; |
| import com.google.common.collect.Multimaps; |
| import com.google.common.collect.Ordering; |
| import org.apache.commons.collections4.BidiMap; |
| import org.apache.commons.collections4.bidimap.DualHashBidiMap; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.util.StringInterner; |
| import org.apache.tez.client.CallerContext; |
| import org.apache.tez.dag.api.event.VertexState; |
| import org.apache.tez.dag.history.HistoryEventType; |
| import org.codehaus.jettison.json.JSONArray; |
| import org.codehaus.jettison.json.JSONException; |
| import org.codehaus.jettison.json.JSONObject; |
| |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.Map; |
| |
| import static org.apache.hadoop.classification.InterfaceAudience.Public; |
| import static org.apache.hadoop.classification.InterfaceStability.Evolving; |
| |
| @Public |
| @Evolving |
| public class DagInfo extends BaseInfo { |
| |
| private static final Log LOG = LogFactory.getLog(DagInfo.class); |
| |
| //Fields populated via JSON |
| private final String name; |
| private final long startTime; |
| private final long endTime; |
| private final long submitTime; |
| private final int failedTasks; |
| private final String dagId; |
| private final int numVertices; |
| private final String status; |
| private final String diagnostics; |
| private String userName; |
| private VersionInfo versionInfo; |
| private CallerContext callerContext; |
| |
| //VertexID --> VertexName & vice versa |
| private final BidiMap<String, String> vertexNameIDMapping; |
| |
| //edgeId to EdgeInfo mapping |
| private final Map<Integer, EdgeInfo> edgeInfoMap; |
| |
| //Only for internal parsing (vertexname mapping) |
| private Map<String, BasicVertexInfo> basicVertexInfoMap; |
| |
| //VertexName --> VertexInfo |
| private Map<String, VertexInfo> vertexNameMap; |
| |
| private Multimap<Container, TaskAttemptInfo> containerMapping; |
| private Map<String, String> config; |
| |
| DagInfo(JSONObject jsonObject) throws JSONException { |
| super(jsonObject); |
| |
| vertexNameMap = Maps.newHashMap(); |
| vertexNameIDMapping = new DualHashBidiMap<>(); |
| edgeInfoMap = Maps.newHashMap(); |
| basicVertexInfoMap = Maps.newHashMap(); |
| containerMapping = LinkedHashMultimap.create(); |
| |
| Preconditions.checkArgument(jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase |
| (Constants.TEZ_DAG_ID)); |
| |
| dagId = StringInterner.weakIntern(jsonObject.getString(Constants.ENTITY)); |
| |
| //Parse additional Info |
| JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO); |
| |
| long sTime = otherInfoNode.optLong(Constants.START_TIME); |
| long eTime= otherInfoNode.optLong(Constants.FINISH_TIME); |
| userName = otherInfoNode.optString(Constants.USER); |
| if (eTime < sTime) { |
| LOG.warn("DAG has got wrong start/end values. " |
| + "startTime=" + sTime + ", endTime=" + eTime + ". Will check " |
| + "timestamps in DAG started/finished events"); |
| |
| // Check if events DAG_STARTED, DAG_FINISHED can be made use of |
| for(Event event : eventList) { |
| switch (HistoryEventType.valueOf(event.getType())) { |
| case DAG_STARTED: |
| sTime = event.getAbsoluteTime(); |
| break; |
| case DAG_FINISHED: |
| eTime = event.getAbsoluteTime(); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| if (eTime < sTime) { |
| LOG.warn("DAG has got wrong start/end values in events as well. " |
| + "startTime=" + sTime + ", endTime=" + eTime); |
| } |
| } |
| startTime = sTime; |
| endTime = eTime; |
| |
| //TODO: Not getting populated correctly for lots of jobs. Verify |
| submitTime = otherInfoNode.optLong(Constants.START_REQUESTED_TIME); |
| diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS); |
| failedTasks = otherInfoNode.optInt(Constants.NUM_FAILED_TASKS); |
| JSONObject dagPlan = otherInfoNode.optJSONObject(Constants.DAG_PLAN); |
| name = StringInterner.weakIntern((dagPlan != null) ? (dagPlan.optString(Constants.DAG_NAME)) : null); |
| if (dagPlan != null) { |
| JSONArray vertices = dagPlan.optJSONArray(Constants.VERTICES); |
| if (vertices != null) { |
| numVertices = vertices.length(); |
| } else { |
| numVertices = 0; |
| } |
| parseDAGPlan(dagPlan); |
| } else { |
| numVertices = 0; |
| } |
| status = StringInterner.weakIntern(otherInfoNode.optString(Constants.STATUS)); |
| |
| //parse name id mapping |
| JSONObject vertexIDMappingJson = otherInfoNode.optJSONObject(Constants.VERTEX_NAME_ID_MAPPING); |
| if (vertexIDMappingJson != null) { |
| //get vertex name |
| for (Map.Entry<String, BasicVertexInfo> entry : basicVertexInfoMap.entrySet()) { |
| String vertexId = vertexIDMappingJson.optString(entry.getKey()); |
| //vertexName --> vertexId |
| vertexNameIDMapping.put(entry.getKey(), vertexId); |
| } |
| } |
| } |
| |
| public static DagInfo create(JSONObject jsonObject) throws JSONException { |
| DagInfo dagInfo = new DagInfo(jsonObject); |
| return dagInfo; |
| } |
| |
| private void parseDAGPlan(JSONObject dagPlan) throws JSONException { |
| int version = dagPlan.optInt(Constants.VERSION, 1); |
| parseEdges(dagPlan.optJSONArray(Constants.EDGES)); |
| |
| JSONArray verticesInfo = dagPlan.optJSONArray(Constants.VERTICES); |
| parseBasicVertexInfo(verticesInfo); |
| |
| if (version > 1) { |
| parseDAGContext(dagPlan.optJSONObject(Constants.DAG_CONTEXT)); |
| } |
| } |
| |
| private void parseDAGContext(JSONObject callerContextInfo) { |
| if (callerContextInfo == null) { |
| LOG.info("No DAG Caller Context available"); |
| return; |
| } |
| String context = callerContextInfo.optString(Constants.CONTEXT); |
| String callerId = callerContextInfo.optString(Constants.CALLER_ID); |
| String callerType = callerContextInfo.optString(Constants.CALLER_TYPE); |
| String description = callerContextInfo.optString(Constants.DESCRIPTION); |
| |
| this.callerContext = CallerContext.create(context, description); |
| if (callerId != null && !callerId.isEmpty() && callerType != null && !callerType.isEmpty()) { |
| this.callerContext.setCallerIdAndType(callerId, callerType); |
| } else { |
| LOG.info("No DAG Caller Context Id and Type available"); |
| } |
| |
| } |
| |
| private void parseBasicVertexInfo(JSONArray verticesInfo) throws JSONException { |
| if (verticesInfo == null) { |
| LOG.info("No vertices available."); |
| return; |
| } |
| |
| //Parse basic information available in DAG for vertex and edges |
| for (int i = 0; i < verticesInfo.length(); i++) { |
| BasicVertexInfo basicVertexInfo = new BasicVertexInfo(); |
| |
| JSONObject vJson = verticesInfo.getJSONObject(i); |
| basicVertexInfo.vertexName = |
| vJson.optString(Constants.VERTEX_NAME); |
| JSONArray inEdges = vJson.optJSONArray(Constants.IN_EDGE_IDS); |
| if (inEdges != null) { |
| String[] inEdgeIds = new String[inEdges.length()]; |
| for (int j = 0; j < inEdges.length(); j++) { |
| inEdgeIds[j] = inEdges.get(j).toString(); |
| } |
| basicVertexInfo.inEdgeIds = inEdgeIds; |
| } |
| |
| JSONArray outEdges = vJson.optJSONArray(Constants.OUT_EDGE_IDS); |
| if (outEdges != null) { |
| String[] outEdgeIds = new String[outEdges.length()]; |
| for (int j = 0; j < outEdges.length(); j++) { |
| outEdgeIds[j] = outEdges.get(j).toString(); |
| } |
| basicVertexInfo.outEdgeIds = outEdgeIds; |
| } |
| |
| JSONArray addInputsJson = |
| vJson.optJSONArray(Constants.ADDITIONAL_INPUTS); |
| basicVertexInfo.additionalInputs = parseAdditionalDetailsForVertex(addInputsJson); |
| |
| JSONArray addOutputsJson = |
| vJson.optJSONArray(Constants.ADDITIONAL_OUTPUTS); |
| basicVertexInfo.additionalOutputs = parseAdditionalDetailsForVertex(addOutputsJson); |
| |
| basicVertexInfoMap.put(basicVertexInfo.vertexName, basicVertexInfo); |
| } |
| } |
| |
| /** |
| * get additional details available for every vertex in the dag |
| * |
| * @param jsonArray |
| * @return AdditionalInputOutputDetails[] |
| * @throws JSONException |
| */ |
| private AdditionalInputOutputDetails[] parseAdditionalDetailsForVertex(JSONArray jsonArray) throws |
| JSONException { |
| if (jsonArray != null) { |
| AdditionalInputOutputDetails[] |
| additionalInputOutputDetails = new AdditionalInputOutputDetails[jsonArray.length()]; |
| for (int j = 0; j < jsonArray.length(); j++) { |
| String name = jsonArray.getJSONObject(j).optString( |
| Constants.NAME); |
| String clazz = jsonArray.getJSONObject(j).optString( |
| Constants.CLASS); |
| String initializer = |
| jsonArray.getJSONObject(j).optString(Constants.INITIALIZER); |
| String userPayloadText = jsonArray.getJSONObject(j).optString( |
| Constants.USER_PAYLOAD_TEXT); |
| |
| additionalInputOutputDetails[j] = |
| new AdditionalInputOutputDetails(name, clazz, initializer, userPayloadText); |
| |
| } |
| return additionalInputOutputDetails; |
| } |
| return null; |
| } |
| |
| /** |
| * Parse edge details in the DAG |
| * |
| * @param edgesArray |
| * |
| * @throws JSONException |
| */ |
| private void parseEdges(JSONArray edgesArray) throws JSONException { |
| if (edgesArray == null) { |
| return; |
| } |
| for (int i = 0; i < edgesArray.length(); i++) { |
| JSONObject edge = edgesArray.getJSONObject(i); |
| Integer edgeId = edge.optInt(Constants.EDGE_ID); |
| String inputVertexName = |
| edge.optString(Constants.INPUT_VERTEX_NAME); |
| String outputVertexName = |
| edge.optString(Constants.OUTPUT_VERTEX_NAME); |
| String dataMovementType = |
| edge.optString(Constants.DATA_MOVEMENT_TYPE); |
| String edgeSourceClass = |
| edge.optString(Constants.EDGE_SOURCE_CLASS); |
| String edgeDestinationClass = |
| edge.optString(Constants.EDGE_DESTINATION_CLASS); |
| String inputUserPayloadAsText = |
| edge.optString(Constants.INPUT_PAYLOAD_TEXT); |
| String outputUserPayloadAsText = |
| edge.optString(Constants.OUTPUT_PAYLOAD_TEXT); |
| EdgeInfo edgeInfo = new EdgeInfo(inputVertexName, outputVertexName, |
| dataMovementType, edgeSourceClass, edgeDestinationClass, inputUserPayloadAsText, |
| outputUserPayloadAsText); |
| edgeInfoMap.put(edgeId, edgeInfo); |
| } |
| } |
| |
| static class BasicVertexInfo { |
| String vertexName; |
| String[] inEdgeIds; |
| String[] outEdgeIds; |
| AdditionalInputOutputDetails[] additionalInputs; |
| AdditionalInputOutputDetails[] additionalOutputs; |
| } |
| |
| void addVertexInfo(VertexInfo vertexInfo) { |
| BasicVertexInfo basicVertexInfo = basicVertexInfoMap.get(vertexInfo.getVertexName()); |
| |
| Preconditions.checkArgument(basicVertexInfo != null, |
| "VerteName " + vertexInfo.getVertexName() |
| + " not present in DAG's vertices " + basicVertexInfoMap.entrySet()); |
| |
| //populate additional information in VertexInfo |
| if (basicVertexInfo.additionalInputs != null) { |
| vertexInfo.setAdditionalInputInfoList(Arrays.asList(basicVertexInfo.additionalInputs)); |
| } |
| if (basicVertexInfo.additionalOutputs != null) { |
| vertexInfo.setAdditionalOutputInfoList(Arrays.asList(basicVertexInfo.additionalOutputs)); |
| } |
| |
| //Populate edge information in vertex |
| if (basicVertexInfo.inEdgeIds != null) { |
| for (String edge : basicVertexInfo.inEdgeIds) { |
| EdgeInfo edgeInfo = edgeInfoMap.get(Integer.parseInt(edge)); |
| Preconditions.checkState(edgeInfo != null, "EdgeId " + edge + " not present in DAG"); |
| vertexInfo.addInEdge(edgeInfo); |
| } |
| } |
| |
| if (basicVertexInfo.outEdgeIds != null) { |
| for (String edge : basicVertexInfo.outEdgeIds) { |
| EdgeInfo edgeInfo = edgeInfoMap.get(Integer.parseInt(edge)); |
| Preconditions.checkState(edgeInfo != null, "EdgeId " + edge + " not present in DAG"); |
| vertexInfo.addOutEdge(edgeInfo); |
| } |
| } |
| |
| vertexNameMap.put(vertexInfo.getVertexName(), vertexInfo); |
| } |
| |
| void setAppConfig(Map<String, String> config) { |
| this.config = config; |
| } |
| |
| public Map<String, String> getAppConfig() { |
| return (config != null) ? Collections.unmodifiableMap(config) : null; |
| } |
| |
| void setVersionInfo(VersionInfo versionInfo) { |
| this.versionInfo = versionInfo; |
| } |
| |
| void addContainerMapping(Container container, TaskAttemptInfo taskAttemptInfo) { |
| this.containerMapping.put(container, taskAttemptInfo); |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("["); |
| sb.append("dagID=").append(getDagId()).append(", "); |
| sb.append("dagName=").append(getName()).append(", "); |
| sb.append("status=").append(getStatus()).append(", "); |
| sb.append("startTime=").append(getStartTimeInterval()).append(", "); |
| sb.append("submitTime=").append(getSubmitTime()).append(", "); |
| sb.append("endTime=").append(getFinishTimeInterval()).append(", "); |
| sb.append("timeTaken=").append(getTimeTaken()).append(", "); |
| sb.append("diagnostics=").append(getDiagnostics()).append(", "); |
| sb.append("vertexNameIDMapping=").append(getVertexNameIDMapping()).append(", "); |
| sb.append("failedTasks=").append(getFailedTaskCount()).append(", "); |
| sb.append("events=").append(getEvents()).append(", "); |
| sb.append("status=").append(getStatus()); |
| sb.append("]"); |
| return sb.toString(); |
| } |
| |
| public Multimap<Container, TaskAttemptInfo> getContainerMapping() { |
| return Multimaps.unmodifiableMultimap(containerMapping); |
| } |
| |
| public final VersionInfo getVersionInfo() { |
| return versionInfo; |
| } |
| |
| public final CallerContext getCallerContext() { |
| return callerContext; |
| } |
| |
| public final String getName() { |
| return name; |
| } |
| |
| public final Collection<EdgeInfo> getEdges() { |
| return Collections.unmodifiableCollection(edgeInfoMap.values()); |
| } |
| |
| public final long getSubmitTime() { |
| return submitTime; |
| } |
| |
| public final long getStartTime() { |
| return startTime; |
| } |
| |
| public final long getFinishTime() { |
| return endTime; |
| } |
| |
| /** |
| * Reference start time for the DAG. Vertex, Task, TaskAttempt would map on to this. |
| * If absolute start time is needed, call getAbsStartTime(). |
| * |
| * @return starting time w.r.t to dag |
| */ |
| public final long getStartTimeInterval() { |
| return 0; |
| } |
| |
| @Override |
| public final long getFinishTimeInterval() { |
| long dagEndTime = (endTime - startTime); |
| if (dagEndTime < 0) { |
| //probably dag is not complete or failed in middle. get the last task attempt time |
| for (VertexInfo vertexInfo : getVertices()) { |
| dagEndTime = (vertexInfo.getFinishTimeInterval() > dagEndTime) ? vertexInfo.getFinishTimeInterval() : dagEndTime; |
| } |
| } |
| return dagEndTime; |
| } |
| |
| public final long getTimeTaken() { |
| return getFinishTimeInterval(); |
| } |
| |
| public final String getStatus() { |
| return status; |
| } |
| |
| /** |
| * Get vertexInfo for a given vertexid |
| * |
| * @param vertexId |
| * @return VertexInfo |
| */ |
| public VertexInfo getVertexFromId(String vertexId) { |
| return vertexNameMap.get(vertexNameIDMapping.getKey(vertexId)); |
| } |
| |
| /** |
| * Get vertexInfo for a given vertex name |
| * |
| * @param vertexName |
| * @return VertexInfo |
| */ |
| public final VertexInfo getVertex(String vertexName) { |
| return vertexNameMap.get(vertexName); |
| } |
| |
| public final String getDiagnostics() { |
| return diagnostics; |
| } |
| |
| /** |
| * Get all vertices |
| * |
| * @return List<VertexInfo> |
| */ |
| public final List<VertexInfo> getVertices() { |
| List<VertexInfo> vertices = Lists.newLinkedList(vertexNameMap.values()); |
| Collections.sort(vertices, new Comparator<VertexInfo>() { |
| |
| @Override public int compare(VertexInfo o1, VertexInfo o2) { |
| return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 : |
| ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ? |
| 0 : 1); |
| } |
| }); |
| return Collections.unmodifiableList(vertices); |
| } |
| |
| /** |
| * Get list of failed vertices |
| * |
| * @return List<VertexInfo> |
| */ |
| public final List<VertexInfo> getFailedVertices() { |
| return getVertices(VertexState.FAILED); |
| } |
| |
| /** |
| * Get list of killed vertices |
| * |
| * @return List<VertexInfo> |
| */ |
| public final List<VertexInfo> getKilledVertices() { |
| return getVertices(VertexState.KILLED); |
| } |
| |
| /** |
| * Get list of failed vertices |
| * |
| * @return List<VertexInfo> |
| */ |
| public final List<VertexInfo> getSuccessfullVertices() { |
| return getVertices(VertexState.SUCCEEDED); |
| } |
| |
| /** |
| * Get list of vertices belonging to a specific state |
| * |
| * @param state |
| * @return Collection<VertexInfo> |
| */ |
| public final List<VertexInfo> getVertices(final VertexState state) { |
| return Collections.unmodifiableList(Lists.newLinkedList(Iterables.filter(Lists.newLinkedList |
| (vertexNameMap.values()), new Predicate<VertexInfo>() { |
| @Override public boolean apply(VertexInfo input) { |
| return input.getStatus() != null && input.getStatus().equals(state.toString()); |
| } |
| } |
| ) |
| ) |
| ); |
| } |
| |
| public final Map<String, VertexInfo> getVertexMapping() { |
| return Collections.unmodifiableMap(vertexNameMap); |
| } |
| |
| private Ordering<VertexInfo> getVertexOrdering() { |
| return Ordering.from(new Comparator<VertexInfo>() { |
| @Override public int compare(VertexInfo o1, VertexInfo o2) { |
| return (o1.getTimeTaken() < o2.getTimeTaken()) ? -1 : |
| ((o1.getTimeTaken() == o2.getTimeTaken()) ? |
| 0 : 1); |
| } |
| }); |
| } |
| |
| /** |
| * Get the slowest vertex in the DAG |
| * |
| * @return VertexInfo |
| */ |
| public final VertexInfo getSlowestVertex() { |
| List<VertexInfo> vertexInfoList = getVertices(); |
| if (vertexInfoList.size() == 0) { |
| return null; |
| } |
| return getVertexOrdering().max(vertexInfoList); |
| } |
| |
| /** |
| * Get the slowest vertex in the DAG |
| * |
| * @return VertexInfo |
| */ |
| public final VertexInfo getFastestVertex() { |
| List<VertexInfo> vertexInfoList = getVertices(); |
| if (vertexInfoList.size() == 0) { |
| return null; |
| } |
| return getVertexOrdering().min(vertexInfoList); |
| } |
| |
| /** |
| * Get node details for this DAG. Would be useful for analyzing node to tasks. |
| * |
| * @return Multimap<String, TaskAttemptInfo> taskAttempt details at every node |
| */ |
| public final Multimap<String, TaskAttemptInfo> getNodeDetails() { |
| Multimap<String, TaskAttemptInfo> nodeDetails = LinkedListMultimap.create(); |
| for (VertexInfo vertexInfo : getVertices()) { |
| Multimap<Container, TaskAttemptInfo> containerMapping = vertexInfo.getContainersMapping(); |
| for (Map.Entry<Container, TaskAttemptInfo> entry : containerMapping.entries()) { |
| nodeDetails.put(entry.getKey().getHost(), entry.getValue()); |
| } |
| } |
| return nodeDetails; |
| } |
| |
| /** |
| * Get containers used for this DAG |
| * |
| * @return Multimap<Container, TaskAttemptInfo> task attempt details at every container |
| */ |
| public final Multimap<Container, TaskAttemptInfo> getContainersToTaskAttemptMapping() { |
| List<VertexInfo> VertexInfoList = getVertices(); |
| Multimap<Container, TaskAttemptInfo> containerMapping = LinkedHashMultimap.create(); |
| |
| for (VertexInfo vertexInfo : VertexInfoList) { |
| containerMapping.putAll(vertexInfo.getContainersMapping()); |
| } |
| return Multimaps.unmodifiableMultimap(containerMapping); |
| } |
| |
| public final Map<String, String> getVertexNameIDMapping() { |
| return vertexNameIDMapping; |
| } |
| |
| public final int getNumVertices() { |
| return numVertices; |
| } |
| |
| public final String getDagId() { |
| return dagId; |
| } |
| |
| public final int getFailedTaskCount() { |
| return failedTasks; |
| } |
| |
| public final String getUserName() { |
| return userName; |
| } |
| |
| final void setUserName(String userName) { |
| this.userName = userName; |
| } |
| |
| } |