blob: c45fcf5ff3dabdd2cfa5784d05c15212ee3dea5d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.history.parser.datamodel;
import org.apache.tez.common.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tez.common.ATSConstants;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.history.parser.utils.Utils;
import org.apache.tez.util.StringInterner;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.apache.hadoop.classification.InterfaceStability.Evolving;
import static org.apache.hadoop.classification.InterfaceAudience.Public;
@Public
@Evolving
public class TaskAttemptInfo extends BaseInfo {
private static final Log LOG = LogFactory.getLog(TaskAttemptInfo.class);
private static final String SUCCEEDED = TaskAttemptState.SUCCEEDED.name();
private final String taskAttemptId;
private final long startTime;
private final long endTime;
private final String diagnostics;
private final long creationTime;
private final long allocationTime;
private final String containerId;
private final String nodeId;
private final String status;
private final String logUrl;
private final String creationCausalTA;
private final String terminationCause;
private final long executionTimeInterval;
// this list is in time order - array list for easy walking
private final ArrayList<DataDependencyEvent> lastDataEvents = Lists.newArrayList();
private TaskInfo taskInfo;
private Container container;
public static class DataDependencyEvent {
String taId;
long timestamp;
public DataDependencyEvent(String id, long time) {
taId = id;
timestamp = time;
}
public long getTimestamp() {
return timestamp;
}
public String getTaskAttemptId() {
return taId;
}
}
TaskAttemptInfo(JSONObject jsonObject) throws JSONException {
super(jsonObject);
Preconditions.checkArgument(
jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase
(Constants.TEZ_TASK_ATTEMPT_ID));
taskAttemptId = StringInterner.intern(jsonObject.optString(Constants.ENTITY));
//Parse additional Info
final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
long sTime = otherInfoNode.optLong(Constants.START_TIME);
long eTime = otherInfoNode.optLong(Constants.FINISH_TIME);
if (eTime < sTime) {
LOG.warn("TaskAttemptInfo has got wrong start/end values. "
+ "startTime=" + sTime + ", endTime=" + eTime + ". Will check "
+ "timestamps in DAG started/finished events");
// Check if events TASK_STARTED, TASK_FINISHED can be made use of
for(Event event : eventList) {
switch (HistoryEventType.valueOf(event.getType())) {
case TASK_ATTEMPT_STARTED:
sTime = event.getAbsoluteTime();
break;
case TASK_ATTEMPT_FINISHED:
eTime = event.getAbsoluteTime();
break;
default:
break;
}
}
if (eTime < sTime) {
LOG.warn("TaskAttemptInfo has got wrong start/end values in events as well. "
+ "startTime=" + sTime + ", endTime=" + eTime);
}
}
startTime = sTime;
endTime = eTime;
diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
creationTime = otherInfoNode.optLong(Constants.CREATION_TIME);
creationCausalTA = StringInterner.intern(
otherInfoNode.optString(Constants.CREATION_CAUSAL_ATTEMPT));
allocationTime = otherInfoNode.optLong(Constants.ALLOCATION_TIME);
containerId = StringInterner.intern(otherInfoNode.optString(Constants.CONTAINER_ID));
String id = otherInfoNode.optString(Constants.NODE_ID);
nodeId = StringInterner.intern((id != null) ? (id.split(":")[0]) : "");
logUrl = otherInfoNode.optString(Constants.COMPLETED_LOGS_URL);
status = StringInterner.intern(otherInfoNode.optString(Constants.STATUS));
container = new Container(containerId, nodeId);
if (otherInfoNode.has(Constants.LAST_DATA_EVENTS)) {
List<DataDependencyEvent> eventInfo = Utils.parseDataEventDependencyFromJSON(
otherInfoNode.optJSONObject(Constants.LAST_DATA_EVENTS));
long lastTime = 0;
for (DataDependencyEvent item : eventInfo) {
// check these are in time order
Preconditions.checkState(lastTime < item.getTimestamp());
lastTime = item.getTimestamp();
lastDataEvents.add(item);
}
}
terminationCause = StringInterner
.intern(otherInfoNode.optString(ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
executionTimeInterval = (endTime > startTime) ? (endTime - startTime) : 0;
}
public static Ordering<TaskAttemptInfo> orderingOnAllocationTime() {
return Ordering.from(new Comparator<TaskAttemptInfo>() {
@Override
public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
return (o1.getAllocationTime() < o2.getAllocationTime() ? -1
: o1.getAllocationTime() > o2.getAllocationTime() ? 1 : 0);
}
});
}
void setTaskInfo(TaskInfo taskInfo) {
this.taskInfo = Objects.requireNonNull(taskInfo, "Provide valid taskInfo");
taskInfo.addTaskAttemptInfo(this);
}
@Override
public final long getStartTimeInterval() {
return startTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime());
}
@Override
public final long getFinishTimeInterval() {
return endTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime());
}
public final boolean isSucceeded() {
return status.equals(SUCCEEDED);
}
public final List<DataDependencyEvent> getLastDataEvents() {
return lastDataEvents;
}
public final long getExecutionTimeInterval() {
return executionTimeInterval;
}
public final long getPostDataExecutionTimeInterval() {
if (getStartTime() > 0 && getFinishTime() > 0) {
// start time defaults to the actual start time
long postDataStartTime = startTime;
if (getLastDataEvents() != null && !getLastDataEvents().isEmpty()) {
// if last data event is after the start time then use last data event time
long lastEventTime = getLastDataEvents().get(getLastDataEvents().size()-1).getTimestamp();
postDataStartTime = startTime > lastEventTime ? startTime : lastEventTime;
}
return (getFinishTime() - postDataStartTime);
}
return -1;
}
public final long getAllocationToEndTimeInterval() {
return (endTime - allocationTime);
}
public final long getAllocationToStartTimeInterval() {
return (startTime - allocationTime);
}
public final long getCreationToAllocationTimeInterval() {
return (allocationTime - creationTime);
}
public final long getStartTime() {
return startTime;
}
public final long getFinishTime() {
return endTime;
}
public final long getCreationTime() {
return creationTime;
}
public final DataDependencyEvent getLastDataEventInfo(long timeThreshold) {
for (int i=lastDataEvents.size()-1; i>=0; i--) {
// walk back in time until we get first event that happened before the threshold
DataDependencyEvent item = lastDataEvents.get(i);
if (item.getTimestamp() < timeThreshold) {
return item;
}
}
return null;
}
public final long getTimeTaken() {
return getFinishTimeInterval() - getStartTimeInterval();
}
public final long getCreationTimeInterval() {
return creationTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime());
}
public final String getCreationCausalTA() {
return creationCausalTA;
}
public final long getAllocationTime() {
return allocationTime;
}
public final String getShortName() {
return getTaskInfo().getVertexInfo().getVertexName() + " : " +
taskAttemptId.substring(taskAttemptId.lastIndexOf('_', taskAttemptId.lastIndexOf('_') - 1) + 1);
}
@Override
public final String getDiagnostics() {
return diagnostics;
}
public final String getTerminationCause() {
return terminationCause;
}
public static TaskAttemptInfo create(JSONObject taskInfoObject) throws JSONException {
return new TaskAttemptInfo(taskInfoObject);
}
public final boolean isLocalityInfoAvailable() {
Map<String, TezCounter> dataLocalTask = getCounter(DAGCounter.class.getName(),
DAGCounter.DATA_LOCAL_TASKS.toString());
Map<String, TezCounter> rackLocalTask = getCounter(DAGCounter.class.getName(),
DAGCounter.RACK_LOCAL_TASKS.toString());
Map<String, TezCounter> otherLocalTask = getCounter(DAGCounter.class.getName(),
DAGCounter.OTHER_LOCAL_TASKS.toString());
if (!dataLocalTask.isEmpty() || !rackLocalTask.isEmpty() || !otherLocalTask.isEmpty()) {
return true;
}
return false;
}
public final String getDetailedStatus() {
if (!Strings.isNullOrEmpty(getTerminationCause())) {
return getStatus() + ":" + getTerminationCause();
}
return getStatus();
}
public final TezCounter getLocalityInfo() {
Map<String, TezCounter> dataLocalTask = getCounter(DAGCounter.class.getName(),
DAGCounter.DATA_LOCAL_TASKS.toString());
Map<String, TezCounter> rackLocalTask = getCounter(DAGCounter.class.getName(),
DAGCounter.RACK_LOCAL_TASKS.toString());
Map<String, TezCounter> otherLocalTask = getCounter(DAGCounter.class.getName(),
DAGCounter.OTHER_LOCAL_TASKS.toString());
if (!dataLocalTask.isEmpty()) {
return dataLocalTask.get(DAGCounter.class.getName());
}
if (!rackLocalTask.isEmpty()) {
return rackLocalTask.get(DAGCounter.class.getName());
}
if (!otherLocalTask.isEmpty()) {
return otherLocalTask.get(DAGCounter.class.getName());
}
return null;
}
public final TaskInfo getTaskInfo() {
return taskInfo;
}
public final String getTaskAttemptId() {
return taskAttemptId;
}
public final String getNodeId() {
return nodeId;
}
public final String getStatus() {
return status;
}
public final Container getContainer() {
return container;
}
public final String getLogURL() {
return logUrl;
}
/**
* Get merge counter per source. Available in case of reducer task
*
* @return Map<String, TezCounter> merge phase time at every counter group level
*/
public final Map<String, TezCounter> getMergePhaseTime() {
return getCounter(null, TaskCounter.MERGE_PHASE_TIME.name());
}
/**
* Get shuffle counter per source. Available in case of shuffle
*
* @return Map<String, TezCounter> shuffle phase time at every counter group level
*/
public final Map<String, TezCounter> getShufflePhaseTime() {
return getCounter(null, TaskCounter.SHUFFLE_PHASE_TIME.name());
}
/**
* Get OUTPUT_BYTES counter per source. Available in case of map outputs
*
* @return Map<String, TezCounter> output bytes counter at every counter group
*/
public final Map<String, TezCounter> getTaskOutputBytes() {
return getCounter(null, TaskCounter.OUTPUT_BYTES.name());
}
/**
* Get number of spills per source. (SPILLED_RECORDS / OUTPUT_RECORDS)
*
* @return Map<String, Long> spill count details
*/
public final Map<String, Float> getSpillCount() {
Map<String, TezCounter> outputRecords = getCounter(null, "OUTPUT_RECORDS");
Map<String, TezCounter> spilledRecords = getCounter(null, "SPILLED_RECORDS");
Map<String, Float> result = Maps.newHashMap();
for (Map.Entry<String, TezCounter> entry : spilledRecords.entrySet()) {
String source = entry.getKey();
long spilledVal = entry.getValue().getValue();
long outputVal = outputRecords.get(source).getValue();
result.put(source, (spilledVal * 1.0f) / (outputVal * 1.0f));
}
return result;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[");
sb.append("taskAttemptId=").append(getTaskAttemptId()).append(", ");
sb.append("creationTime=").append(getCreationTimeInterval()).append(", ");
sb.append("startTime=").append(getStartTimeInterval()).append(", ");
sb.append("finishTime=").append(getFinishTimeInterval()).append(", ");
sb.append("timeTaken=").append(getTimeTaken()).append(", ");
sb.append("events=").append(getEvents()).append(", ");
sb.append("diagnostics=").append(getDiagnostics()).append(", ");
sb.append("container=").append(getContainer()).append(", ");
sb.append("nodeId=").append(getNodeId()).append(", ");
sb.append("logURL=").append(getLogURL()).append(", ");
sb.append("status=").append(getDetailedStatus());
sb.append("]");
return sb.toString();
}
}