blob: c70fdaef6962951ca3d1246afeb169ff72a38c09 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.web;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeMap;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.LimitExceededException;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.client.ProgressBuilder;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.webapp.Controller;
import org.apache.hadoop.yarn.webapp.MimeType;
import org.apache.hadoop.yarn.webapp.View;
import org.apache.hadoop.yarn.webapp.WebAppException;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
public class AMWebController extends Controller {
private final static Logger LOG = LoggerFactory.getLogger(AMWebController.class);
// HTTP CORS Response Headers
static final String ACCESS_CONTROL_ALLOW_ORIGIN = "Access-Control-Allow-Origin";
static final String ACCESS_CONTROL_ALLOW_CREDENTIALS = "Access-Control-Allow-Credentials";
static final String ACCESS_CONTROL_ALLOW_METHODS = "Access-Control-Allow-Methods";
static final String ACCESS_CONTROL_ALLOW_HEADERS = "Access-Control-Allow-Headers";
static final String ACCESS_CONTROL_MAX_AGE = "Access-Control-Max-Age";
// CORS default responses.
static final String ALLOWED_METHODS = "GET, HEAD";
static final String ALLOWED_HEADERS = "X-Requested-With,Content-Type,Accept,Origin";
static final String DAG_PROGRESS = "dagProgress";
static final String VERTEX_PROGRESS = "vertexProgress";
static final String VERTEX_PROGRESSES = "vertexProgresses";
static final int MAX_QUERIED = 100;
public static final String VERSION = "2";
private AppContext appContext;
private String historyUrl;
@Inject
public AMWebController(RequestContext requestContext,
AppContext appContext,
@Named("TezUIHistoryURL") String historyUrl) {
super(requestContext);
this.appContext = appContext;
this.historyUrl = historyUrl;
}
@Override
public void index() {
ui();
}
public void ui() {
render(StaticAMView.class);
}
public void main() {
ui();
}
public void about() {
renderJSON("Tez AM UI WebServices");
}
@VisibleForTesting
public void setCorsHeaders() {
final HttpServletResponse res = response();
/*
* ideally the Origin and other CORS headers should be checked and response headers set only
* if it matches the allowed origins. however rm does not forward these headers.
*/
String historyUrlBase = appContext.getAMConf().get(TezConfiguration.TEZ_HISTORY_URL_BASE, "");
String origin = null;
try {
URL url = new URL(historyUrlBase);
origin = url.getProtocol() + "://" + url.getAuthority();
} catch (MalformedURLException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Invalid url set for tez history url base: " + historyUrlBase, e);
}
}
if (origin != null) {
res.setHeader(ACCESS_CONTROL_ALLOW_ORIGIN, origin);
}
res.setHeader(ACCESS_CONTROL_ALLOW_METHODS, ALLOWED_METHODS);
res.setHeader(ACCESS_CONTROL_ALLOW_CREDENTIALS, Boolean.TRUE.toString());
res.setHeader(ACCESS_CONTROL_ALLOW_HEADERS, ALLOWED_HEADERS);
res.setHeader(ACCESS_CONTROL_MAX_AGE, "1800");
}
void sendErrorResponse(int sc, String msg, Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug(msg, e);
}
try {
response().sendError(sc, msg);
} catch (IOException e1) {
throw new WebAppException(e);
}
}
@VisibleForTesting
static boolean _hasAccess(UserGroupInformation callerUGI, AppContext appContext) {
if (callerUGI == null) {
// Allow anonymous access iff acls disabled
return !appContext.getAMACLManager().isAclsEnabled();
}
return appContext.getAMACLManager().checkDAGViewAccess(callerUGI);
}
public boolean hasAccess() {
String remoteUser = request().getRemoteUser();
UserGroupInformation callerUGI = null;
if (remoteUser != null && !remoteUser.isEmpty()) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}
return _hasAccess(callerUGI, appContext);
}
public void getDagProgress() {
setCorsHeaders();
if (!hasAccess()) {
sendErrorResponse(HttpServletResponse.SC_UNAUTHORIZED, "Access denied for user: " +
request().getRemoteUser(), null);
return;
}
int dagID;
try {
dagID = getQueryParamInt(WebUIService.DAG_ID);
} catch (NumberFormatException e) {
sendErrorResponse(HttpServletResponse.SC_BAD_REQUEST, "Invalid dag id:", e);
return;
}
DAG currentDAG = appContext.getCurrentDAG();
if (currentDAG == null || dagID != currentDAG.getID().getId()) {
sendErrorResponse(HttpServletResponse.SC_NOT_FOUND, "Not current Dag: " + dagID, null);
return;
}
Map<String, ProgressInfo> result = new HashMap<String, ProgressInfo>();
result.put(DAG_PROGRESS,
new ProgressInfo(currentDAG.getID().toString(),
currentDAG.getCompletedTaskProgress()));
renderJSON(result);
}
public void getVertexProgress() {
int dagID;
int vertexID;
setCorsHeaders();
if (!hasAccess()) {
sendErrorResponse(HttpServletResponse.SC_UNAUTHORIZED, "Access denied for user: " +
request().getRemoteUser(), null);
return;
}
try {
dagID = getQueryParamInt(WebUIService.DAG_ID);
vertexID = getQueryParamInt(WebUIService.VERTEX_ID);
} catch (NumberFormatException e) {
sendErrorResponse(HttpServletResponse.SC_BAD_REQUEST, "Invalid dag or vertex id", e);
return;
}
DAG currentDAG = appContext.getCurrentDAG();
if (currentDAG == null || currentDAG.getID().getId() != dagID) {
sendErrorResponse(HttpServletResponse.SC_NOT_FOUND, "Not current Dag: " + dagID, null);
return;
}
final TezVertexID tezVertexID = TezVertexID.getInstance(currentDAG.getID(), vertexID);
Vertex vertex = currentDAG.getVertex(tezVertexID);
if (vertex == null) {
sendErrorResponse(HttpServletResponse.SC_NOT_FOUND, "vertex not found: " + vertexID, null);
return;
}
Map<String, ProgressInfo> result = new HashMap<String, ProgressInfo>();
result.put(VERTEX_PROGRESS, new ProgressInfo(tezVertexID.toString(),
vertex.getCompletedTaskProgress()));
renderJSON(result);
}
Collection<Vertex> getVerticesByIdx(DAG dag, Collection<Integer> indexes) {
Collection<Vertex> vertices = new ArrayList<Vertex>(indexes.size());
final TezDAGID tezDAGID = dag.getID();
for (Integer idx : indexes) {
final TezVertexID tezVertexID = TezVertexID.getInstance(tezDAGID, idx);
if (tezVertexID == null) {
continue;
}
final Vertex vertex = dag.getVertex(tezVertexID);
if (vertex != null) {
vertices.add(vertex);
}
}
return vertices;
}
int getQueryParamInt(String name) throws NumberFormatException {
final String valueStr = $(name).trim();
return Integer.parseInt(valueStr);
}
public void getVertexProgresses() {
int dagID;
setCorsHeaders();
if (!hasAccess()) {
sendErrorResponse(HttpServletResponse.SC_UNAUTHORIZED, "Access denied for user: " +
request().getRemoteUser(), null);
return;
}
List<Integer> vertexIDs = new ArrayList<Integer>();
try {
dagID = getQueryParamInt(WebUIService.DAG_ID);
for (String vertexIDStr : $(WebUIService.VERTEX_ID).trim().split(",", MAX_QUERIED)) {
vertexIDs.add(Integer.parseInt(vertexIDStr));
}
} catch (NumberFormatException e) {
sendErrorResponse(HttpServletResponse.SC_BAD_REQUEST, "Invalid dag or vertices id", e);
return;
}
DAG currentDAG = appContext.getCurrentDAG();
if (currentDAG == null || currentDAG.getID().getId() != dagID) {
sendErrorResponse(HttpServletResponse.SC_NOT_FOUND, "Not current Dag: " + dagID, null);
return;
}
Collection<Vertex> vertices;
if (vertexIDs.isEmpty()) {
vertices = currentDAG.getVertices().values();
} else {
vertices = getVerticesByIdx(currentDAG, vertexIDs);
}
Collection<ProgressInfo> progresses = new ArrayList<ProgressInfo>(vertices.size());
for(Vertex vertex : vertices) {
progresses.add(new ProgressInfo(vertex.getVertexId().toString(),
vertex.getCompletedTaskProgress()));
}
Map<String, Collection<ProgressInfo>> result = new HashMap<String, Collection<ProgressInfo>>();
result.put(VERTEX_PROGRESSES, progresses);
renderJSON(result);
}
// AM WebApi V2.
@VisibleForTesting
protected boolean setupResponse() {
setCorsHeaders();
if (!hasAccess()) {
sendErrorResponse(HttpServletResponse.SC_UNAUTHORIZED, "Access denied for user: " +
request().getRemoteUser(), null);
return false;
}
return true;
}
DAG checkAndGetDAGFromRequest() {
DAG dag = null;
int errorCode = HttpServletResponse.SC_OK;
String message = null;
Exception ex = null;
try {
int dagID = getQueryParamInt(WebUIService.DAG_ID);
dag = appContext.getCurrentDAG();
if (dag == null || dag.getID().getId() != dagID) {
errorCode = HttpServletResponse.SC_NOT_FOUND;
message = "Not current Dag: " + dagID;
}
} catch (NumberFormatException e) {
errorCode = HttpServletResponse.SC_BAD_REQUEST;
message = "Invalid dag id";
ex = e;
}
if (errorCode != HttpServletResponse.SC_OK) {
dag = null;
sendErrorResponse(errorCode, message, ex);
}
return dag;
}
Collection<Integer> getVertexIDsFromRequest() {
final String valueStr = $(WebUIService.VERTEX_ID).trim();
List<Integer> vertexIDs = new ArrayList<Integer>();
if (!valueStr.equals("")) {
String[] vertexIdsStr = valueStr.split(",", MAX_QUERIED);
try {
for (String vertexIdStr : vertexIdsStr) {
int vertexId = Integer.parseInt(vertexIdStr);
vertexIDs.add(vertexId);
}
} catch (NumberFormatException nfe) {
sendErrorResponse(HttpServletResponse.SC_BAD_REQUEST,
"invalid vertex ID passed in as parameter", nfe);
vertexIDs = null;
}
}
return vertexIDs;
}
/**
* Parse a params list in the format: CtrGroup/CtrName1,CtrName2;CtrGroup2;
* @return nested structure of counter groups and names. Null if nothing specified.
*/
Map<String, Set<String>> getCounterListFromRequest() {
final String counterStr = $(WebUIService.COUNTERS).trim();
if (counterStr == null || counterStr.isEmpty()) {
return null;
}
String delimiter = ";";
String groupDelimiter = "/";
String counterDelimiter = ",";
StringTokenizer tokenizer = new StringTokenizer(counterStr, delimiter);
Map<String, Set<String>> counterList = new TreeMap<String, Set<String>>();
while (tokenizer.hasMoreElements()) {
String token = tokenizer.nextToken().trim();
int pos = token.indexOf(groupDelimiter);
if (pos == -1) {
counterList.put(token, Collections.<String>emptySet());
continue;
}
String counterGroup = token.substring(0, pos);
Set<String> counters = Collections.<String>emptySet();
if (pos < token.length() - 1) {
String counterNames = token.substring(pos+1, token.length());
counters = Sets.newHashSet(
Splitter.on(counterDelimiter).omitEmptyStrings()
.trimResults().split(counterNames));
}
counterList.put(counterGroup, counters);
}
return counterList;
}
List<String> splitString(String str, String delimiter, Integer limit) {
List<String> items = new ArrayList<String>();
StringTokenizer tokenizer = new StringTokenizer(str, delimiter);
for(int count = 0; tokenizer.hasMoreElements() && count < limit; count ++) {
items.add(tokenizer.nextToken());
}
return items;
}
/**
* getIntegersFromRequest
* Parses a query parameter with comma separated values and returns an array of integers.
* The function returns null if any of the value is not an integer
*
* @param paramName {String}
* @param limit {Integer} Maximum number of values to be taken
*
* @return {List<Integer>} List of parsed values
*/
List<Integer> getIntegersFromRequest(String paramName, Integer limit) {
String valuesStr = $(paramName).trim();
List<Integer> values = new ArrayList<Integer>();
if (!valuesStr.equals("")) {
try {
for (String valueStr : splitString(valuesStr, ",", limit)) {
int value = Integer.parseInt(valueStr);
values.add(value);
}
} catch (NumberFormatException nfe) {
sendErrorResponse(HttpServletResponse.SC_BAD_REQUEST,
String.format("invalid %s passed in as parameter", paramName), nfe);
values = null;
}
}
return values;
}
/**
* getTaskIDsFromRequest
* Takes in "1_0,1_3" and returns [[1,0],[1,3]]
* Mainly to parse a query parameter with comma separated indexes. For vertex its the index,
* for task its vertexIndex_taskIndex and for attempts its vertexIndex_taskIndex_attemptNo
* The function returns null if any of the value is not an integer
*
* @param paramName {String}
* @param limit {Integer} Maximum number of values to be taken
*
* @return {List<List<Integer>>} List of parsed values
*/
List<List<Integer>> getIDsFromRequest(String paramName, Integer limit, Integer count) {
String valuesStr = $(paramName).trim();
List<List<Integer>> values = new ArrayList<List<Integer>>();
if (!valuesStr.equals("")) {
try {
for (String valueStr : splitString(valuesStr, ",", limit)) {
List<Integer> innerValues = new ArrayList<Integer>();
String innerValueStrs[] = valueStr.split("_");
if(innerValueStrs.length == count) {
for (String innerValueStr : innerValueStrs) {
int value = Integer.parseInt(innerValueStr);
innerValues.add(value);
}
values.add(innerValues);
}
}
} catch (NumberFormatException nfe) {
sendErrorResponse(HttpServletResponse.SC_BAD_REQUEST,
String.format("invalid %s passed in as parameter", paramName), nfe);
values = null;
}
}
return values;
}
public void getDagInfo() {
if (!setupResponse()) {
return;
}
DAG dag = checkAndGetDAGFromRequest();
if (dag == null) {
return;
}
Map<String, Set<String>> counterNames = getCounterListFromRequest();
Map<String, Object> dagInfo = new HashMap<String, Object>();
dagInfo.put("id", dag.getID().toString());
dagInfo.put("progress", Float.toString(dag.getCompletedTaskProgress()));
dagInfo.put("status", dag.getState().toString());
try {
if (counterNames != null && !counterNames.isEmpty()) {
TezCounters counters = dag.getCachedCounters();
Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters, counterNames);
if (counterMap != null && !counterMap.isEmpty()) {
dagInfo.put("counters", counterMap);
}
}
} catch (LimitExceededException e) {
// Ignore
// TODO: add an error message instead for counter key
}
renderJSON(ImmutableMap.of(
"dag", dagInfo
));
}
Map<String, Map<String, Long>> constructCounterMapInfo(TezCounters counters,
Map<String, Set<String>> counterNames) {
if (counterNames == null || counterNames.isEmpty()) {
return null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Requested counter names=" + counterNames.entrySet());
LOG.debug("actual counters=" + counters);
}
Map<String, Map<String, Long>> counterInfo = new TreeMap<String, Map<String, Long>>();
if (counterNames.containsKey("*")) {
for (CounterGroup grpCounters : counters) {
Map<String, Long> matchedCounters = new HashMap<String, Long>();
for (TezCounter counter : grpCounters) {
matchedCounters.put(counter.getName(), counter.getValue());
}
counterInfo.put(grpCounters.getName(), matchedCounters);
}
} else {
for (Entry<String, Set<String>> entry : counterNames.entrySet()) {
Map<String, Long> matchedCounters = new HashMap<String, Long>();
CounterGroup grpCounters = counters.getGroup(entry.getKey());
for (TezCounter counter : grpCounters) {
if (entry.getValue().isEmpty() || entry.getValue().contains(counter.getName())) {
matchedCounters.put(counter.getName(), counter.getValue());
}
}
counterInfo.put(entry.getKey(), matchedCounters);
}
}
return counterInfo;
}
private Map<String, Object> getVertexInfoMap(Vertex vertex,
Map<String, Set<String>> counterNames) {
Map<String, Object> vertexInfo = new HashMap<String, Object>();
vertexInfo.put("id", vertex.getVertexId().toString());
vertexInfo.put("status", vertex.getState().toString());
vertexInfo.put("progress", Float.toString(vertex.getCompletedTaskProgress()));
ProgressBuilder vertexProgress = vertex.getVertexProgress();
vertexInfo.put("totalTasks", Integer.toString(vertexProgress.getTotalTaskCount()));
vertexInfo.put("runningTasks", Integer.toString(vertexProgress.getRunningTaskCount()));
vertexInfo.put("succeededTasks", Integer.toString(vertexProgress.getSucceededTaskCount()));
vertexInfo.put("failedTaskAttempts",
Integer.toString(vertexProgress.getFailedTaskAttemptCount()));
vertexInfo.put("killedTaskAttempts",
Integer.toString(vertexProgress.getKilledTaskAttemptCount()));
try {
if (counterNames != null && !counterNames.isEmpty()) {
TezCounters counters = vertex.getCachedCounters();
Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters, counterNames);
if (counterMap != null && !counterMap.isEmpty()) {
vertexInfo.put("counters", counterMap);
}
}
} catch (LimitExceededException e) {
// Ignore
// TODO: add an error message instead for counter key
}
return vertexInfo;
}
public void getVerticesInfo() {
if (!setupResponse()) {
return;
}
DAG dag = checkAndGetDAGFromRequest();
if (dag == null) {
return;
}
Collection<Integer> requestedIDs = getVertexIDsFromRequest();
if (requestedIDs == null) {
return;
}
Map<String, Set<String>> counterNames = getCounterListFromRequest();
Collection<Vertex> vertexList;
if (requestedIDs.isEmpty()) {
// no ids specified return all.
vertexList = dag.getVertices().values();
} else {
vertexList = getVerticesByIdx(dag, requestedIDs);
}
ArrayList<Map<String, Object>> verticesInfo = new ArrayList<Map<String, Object>>();
for(Vertex v : vertexList) {
verticesInfo.add(getVertexInfoMap(v, counterNames));
}
renderJSON(ImmutableMap.of(
"vertices", verticesInfo
));
}
Vertex getVertexFromIndex(DAG dag, Integer vertexIndex) {
final TezVertexID tezVertexID = TezVertexID.getInstance(dag.getID(), vertexIndex);
Vertex vertex = dag.getVertex(tezVertexID);
return vertex;
}
/**
* getRequestedTasks
* Heart of getTasksInfo. Given a dag and a limit, based on the incoming query parameters
* returns a list of task instances
*
* @param dag {DAG}
* @param limit {Integer}
*/
List<Task> getRequestedTasks(DAG dag, Integer limit) {
List<Task> tasks = new ArrayList<Task>();
List<List<Integer>> taskIDs = getIDsFromRequest(WebUIService.TASK_ID, limit, 2);
if(taskIDs == null) {
return null;
}
else if(!taskIDs.isEmpty()) {
for (List<Integer> indexes : taskIDs) {
Vertex vertex = getVertexFromIndex(dag, indexes.get(0));
if(vertex == null) {
continue;
}
Task task = vertex.getTask(indexes.get(1));
if(task == null) {
continue;
}
else {
tasks.add(task);
}
if(tasks.size() >= limit) {
break;
}
}
}
else {
List<Integer> vertexIDs = getIntegersFromRequest(WebUIService.VERTEX_ID, limit);
if(vertexIDs == null) {
return null;
}
else if(!vertexIDs.isEmpty()) {
for (Integer vertexID : vertexIDs) {
Vertex vertex = getVertexFromIndex(dag, vertexID);
if(vertex == null) {
continue;
}
List<Task> vertexTasks = new ArrayList<Task>(vertex.getTasks().values());
tasks.addAll(vertexTasks.subList(0, Math.min(vertexTasks.size(), limit - tasks.size())));
if(tasks.size() >= limit) {
break;
}
}
}
else {
Collection<Vertex> vertices = dag.getVertices().values();
for (Vertex vertex : vertices) {
List<Task> vertexTasks = new ArrayList<Task>(vertex.getTasks().values());
tasks.addAll(vertexTasks.subList(0, Math.min(vertexTasks.size(), limit - tasks.size())));
if(tasks.size() >= limit) {
break;
}
}
}
}
return tasks;
}
/**
* Renders the response JSON for tasksInfo API
* The JSON will have an array of task objects under the key tasks.
*/
public void getTasksInfo() {
if (!setupResponse()) {
return;
}
DAG dag = checkAndGetDAGFromRequest();
if (dag == null) {
return;
}
int limit = MAX_QUERIED;
try {
limit = getQueryParamInt(WebUIService.LIMIT);
} catch (NumberFormatException e) {
//Ignore
}
List<Task> tasks = getRequestedTasks(dag, limit);
if(tasks == null) {
return;
}
Map<String, Set<String>> counterNames = getCounterListFromRequest();
ArrayList<Map<String, Object>> tasksInfo = new ArrayList<Map<String, Object>>();
for(Task t : tasks) {
Map<String, Object> taskInfo = new HashMap<String, Object>();
taskInfo.put("id", t.getTaskId().toString());
taskInfo.put("progress", Float.toString(t.getProgress()));
taskInfo.put("status", t.getState().toString());
try {
TezCounters counters = t.getCounters();
Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters, counterNames);
if (counterMap != null && !counterMap.isEmpty()) {
taskInfo.put("counters", counterMap);
}
} catch (LimitExceededException e) {
// Ignore
// TODO: add an error message instead for counter key
}
tasksInfo.add(taskInfo);
}
renderJSON(ImmutableMap.of(
"tasks", tasksInfo
));
}
/**
* getRequestedAttempts
* Given a dag and a limit, based on the incoming query parameters. Used by getAttemptsInfo
* returns a list of task instances
*
* @param dag {DAG}
* @param limit {Integer}
*/
List<TaskAttempt> getRequestedAttempts(DAG dag, Integer limit) {
List<TaskAttempt> attempts = new ArrayList<TaskAttempt>();
List<List<Integer>> attemptIDs = getIDsFromRequest(WebUIService.ATTEMPT_ID, limit, 3);
if(attemptIDs == null) {
return null;
}
else if(!attemptIDs.isEmpty()) {
for (List<Integer> indexes : attemptIDs) {
Vertex vertex = getVertexFromIndex(dag, indexes.get(0));
if(vertex == null) {
continue;
}
Task task = vertex.getTask(indexes.get(1));
if(task == null) {
continue;
}
TaskAttempt attempt = task.
getAttempt(TezTaskAttemptID.getInstance(task.getTaskId(), indexes.get(2)));
if(attempt == null) {
continue;
}
else {
attempts.add(attempt);
}
if(attempts.size() >= limit) {
break;
}
}
}
return attempts;
}
/**
* Renders the response JSON for attemptsInfo API
* The JSON will have an array of attempt objects under the key attempts.
*/
public void getAttemptsInfo() {
if (!setupResponse()) {
return;
}
DAG dag = checkAndGetDAGFromRequest();
if (dag == null) {
return;
}
int limit = MAX_QUERIED;
try {
limit = getQueryParamInt(WebUIService.LIMIT);
} catch (NumberFormatException e) {
//Ignore
}
List<TaskAttempt> attempts = getRequestedAttempts(dag, limit);
if(attempts == null) {
return;
}
Map<String, Set<String>> counterNames = getCounterListFromRequest();
ArrayList<Map<String, Object>> attemptsInfo = new ArrayList<Map<String, Object>>();
for(TaskAttempt a : attempts) {
Map<String, Object> attemptInfo = new HashMap<String, Object>();
attemptInfo.put("id", a.getID().toString());
attemptInfo.put("progress", Float.toString(a.getProgress()));
attemptInfo.put("status", a.getState().toString());
try {
TezCounters counters = a.getCounters();
Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters, counterNames);
if (counterMap != null && !counterMap.isEmpty()) {
attemptInfo.put("counters", counterMap);
}
} catch (LimitExceededException e) {
// Ignore
// TODO: add an error message instead for counter key
}
attemptsInfo.add(attemptInfo);
}
renderJSON(ImmutableMap.of("attempts", attemptsInfo));
}
@Override
@VisibleForTesting
public void renderJSON(Object object) {
super.renderJSON(object);
}
public static class StaticAMView extends View {
@Inject
AppContext appContext;
@Inject
@Named("TezUIHistoryURL") String historyUrl;
@Override
public void render() {
response().setContentType(MimeType.HTML);
PrintWriter pw = writer();
pw.write("<html>");
pw.write("<head>");
pw.write("<meta charset=\"utf-8\">");
pw.write("<title>Redirecting to Tez UI</title>");
pw.write("</head>");
pw.write("<body>");
if (historyUrl == null || historyUrl.isEmpty()) {
pw.write("<h1>Tez UI Url is not defined.</h1>" +
"<p>To enable tracking url pointing to Tez UI, set the config <b>" +
TezConfiguration.TEZ_HISTORY_URL_BASE + "</b> in the tez-site.xml.</p>");
} else {
pw.write("<h1>Redirecting to Tez UI</h1>. <p>If you are not redirected shortly, click " +
"<a href='" + historyUrl + "'><b>here</b></a></p>"
);
pw.write("<script type='text/javascript'>setTimeout(function() { " +
"window.location.replace('" + historyUrl + "');" +
"}, 0); </script>");
}
pw.write("</body>");
pw.write("</html>");
pw.flush();
}
}
@VisibleForTesting
static class ProgressInfo {
private String id;
public float getProgress() {
return progress;
}
public String getId() {
return id;
}
private float progress;
public ProgressInfo(String id, float progress) {
this.id = id;
this.progress = progress;
}
}
}