blob: d640704f93f1b75285654f101c52dcb0b559f2f5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.analyzer.plugins;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.analyzer.Analyzer;
import org.apache.tez.analyzer.CSVResult;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.FileSystemCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.history.parser.datamodel.DagInfo;
import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
import org.apache.tez.history.parser.datamodel.VertexInfo;
import java.util.List;
import java.util.Map;
/**
* Get locality information for tasks for vertices and get their task execution times.
* This would be helpeful to co-relate if the vertex runtime is anyways related to the data
* locality.
*/
public class LocalityAnalyzer extends TezAnalyzerBase implements Analyzer {
private final String[] headers = { "vertexName", "numTasks", "dataLocalRatio", "rackLocalRatio",
"otherRatio", "avgDataLocalTaskRuntime", "avgRackLocalTaskRuntime",
"avgOtherLocalTaskRuntime", "noOfInputs", "avgHDFSBytesRead_DataLocal",
"avgHDFSBytesRead_RackLocal", "avgHDFSBytesRead_Others", "recommendation" };
private static final String DATA_LOCAL_RATIO = "tez.locality-analyzer.data.local.ratio";
private static final float DATA_LOCAL_RATIO_DEFAULT = 0.5f;
private final CSVResult csvResult;
public LocalityAnalyzer(Configuration config) {
super(config);
csvResult = new CSVResult(headers);
}
@Override
public void analyze(DagInfo dagInfo) throws TezException {
for (VertexInfo vertexInfo : dagInfo.getVertices()) {
String vertexName = vertexInfo.getVertexName();
Map<String, TezCounter> dataLocalTask = vertexInfo.getCounter(DAGCounter.class.getName(),
DAGCounter.DATA_LOCAL_TASKS.toString());
Map<String, TezCounter> rackLocalTask = vertexInfo.getCounter(DAGCounter.class.getName(),
DAGCounter.RACK_LOCAL_TASKS.toString());
long dataLocalTasks = 0;
long rackLocalTasks = 0;
if (!dataLocalTask.isEmpty()) {
dataLocalTasks = dataLocalTask.get(DAGCounter.class.getName()).getValue();
}
if (!rackLocalTask.isEmpty()) {
rackLocalTasks = rackLocalTask.get(DAGCounter.class.getName()).getValue();
}
long totalVertexTasks = vertexInfo.getNumTasks();
if (dataLocalTasks > 0 || rackLocalTasks > 0) {
//compute locality details.
float dataLocalRatio = dataLocalTasks * 1.0f / totalVertexTasks;
float rackLocalRatio = rackLocalTasks * 1.0f / totalVertexTasks;
float othersRatio = (totalVertexTasks - (dataLocalTasks + rackLocalTasks)) * 1.0f /
totalVertexTasks;
List<String> record = Lists.newLinkedList();
record.add(vertexName);
record.add(totalVertexTasks + "");
record.add(dataLocalRatio + "");
record.add(rackLocalRatio + "");
record.add(othersRatio + "");
TaskAttemptDetails dataLocalResult = computeAverages(vertexInfo,
DAGCounter.DATA_LOCAL_TASKS);
TaskAttemptDetails rackLocalResult = computeAverages(vertexInfo,
DAGCounter.RACK_LOCAL_TASKS);
TaskAttemptDetails otherTaskResult = computeAverages(vertexInfo,
DAGCounter.OTHER_LOCAL_TASKS);
record.add(dataLocalResult.avgRuntime + "");
record.add(rackLocalResult.avgRuntime + "");
record.add(otherTaskResult.avgRuntime + "");
//Get the number of inputs to this vertex
record.add(vertexInfo.getInputEdges().size()
+ vertexInfo.getAdditionalInputInfoList().size() + "");
//Get the avg HDFS bytes read in this vertex for different type of locality
record.add(dataLocalResult.avgHDFSBytesRead + "");
record.add(rackLocalResult.avgHDFSBytesRead + "");
record.add(otherTaskResult.avgHDFSBytesRead + "");
String recommendation = "";
if (dataLocalRatio < getConf().getFloat(DATA_LOCAL_RATIO, DATA_LOCAL_RATIO_DEFAULT)) {
recommendation = "Data locality is poor for this vertex. Try tuning "
+ TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS + ", "
+ TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED + ", "
+ TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED;
}
record.add(recommendation);
csvResult.addRecord(record.toArray(new String[record.size()]));
}
}
}
/**
* Compute counter averages for specific vertex
*
* @param vertexInfo
* @param counter
* @return task attempt details
*/
private TaskAttemptDetails computeAverages(VertexInfo vertexInfo, DAGCounter counter) {
long totalTime = 0;
long totalTasks = 0;
long totalHDFSBytesRead = 0;
TaskAttemptDetails result = new TaskAttemptDetails();
for(TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
Map<String, TezCounter> localityCounter = attemptInfo.getCounter(DAGCounter.class.getName(),
counter.toString());
if (!localityCounter.isEmpty() &&
localityCounter.get(DAGCounter.class.getName()).getValue() > 0) {
totalTime += attemptInfo.getTimeTaken();
totalTasks++;
//get HDFSBytes read counter
Map<String, TezCounter> hdfsBytesReadCounter = attemptInfo.getCounter(FileSystemCounter
.class.getName(), FileSystemCounter.HDFS_BYTES_READ.name());
for(Map.Entry<String, TezCounter> entry : hdfsBytesReadCounter.entrySet()) {
totalHDFSBytesRead += entry.getValue().getValue();
}
}
}
if (totalTasks > 0) {
result.avgRuntime = (totalTime * 1.0f / totalTasks);
result.avgHDFSBytesRead = (totalHDFSBytesRead * 1.0f / totalTasks);
}
return result;
}
@Override public CSVResult getResult() throws TezException {
return csvResult;
}
@Override public String getName() {
return "Locality Analyzer";
}
@Override public String getDescription() {
return "Analyze for locality information (data local, rack local, off-rack)";
}
/**
* Placeholder for task attempt details
*/
static class TaskAttemptDetails {
float avgHDFSBytesRead;
float avgRuntime;
}
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
LocalityAnalyzer analyzer = new LocalityAnalyzer(config);
int res = ToolRunner.run(config, analyzer, args);
analyzer.printResults();
System.exit(res);
}
}