blob: d4bebc819b502ffb47019a6a43a1479db8acc685 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.jpm.mr.running.parser;
import com.typesafe.config.Config;
import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer;
import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
import org.apache.eagle.jpm.mr.runningentity.JobConfig;
import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
import org.apache.eagle.jpm.mr.runningentity.TaskAttemptExecutionAPIEntity;
import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity;
import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.jpm.util.JobNameNormalization;
import org.apache.eagle.jpm.util.MRJobTagName;
import org.apache.eagle.jpm.util.Utils;
import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
import org.apache.eagle.jpm.util.resourcefetch.connection.URLConnectionUtils;
import org.apache.eagle.jpm.util.resourcefetch.model.*;
import org.apache.eagle.jpm.util.resourcefetch.model.JobCounters;
import org.apache.commons.lang3.tuple.Pair;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import java.io.InputStream;
import java.net.URLConnection;
import java.util.*;
import java.util.function.Function;
import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
public class MRJobParser implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(MRJobParser.class);
public enum ParserStatus {
RUNNING,
FINISHED,
APP_FINISHED
}
private AppInfo app;
private static final int MAX_RETRY_TIMES = 2;
private MRJobEntityCreationHandler mrJobEntityCreationHandler;
//<jobId, JobExecutionAPIEntity>
private Map<String, JobExecutionAPIEntity> mrJobEntityMap;
private Map<String, JobConfig> mrJobConfigs;
private static final String XML_HTTP_HEADER = "Accept";
private static final String XML_FORMAT = "application/xml";
private static final int CONNECTION_TIMEOUT = 10000;
private static final int READ_TIMEOUT = 10000;
private final Object lock = new Object();
private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
private Map<String, String> commonTags = new HashMap<>();
private MRRunningJobManager runningJobManager;
private ParserStatus parserStatus;
private Set<String> finishedTaskIds;
private List<String> configKeys;
private static final int TOP_BOTTOM_TASKS_BY_ELAPSED_TIME = 10;
private static final int FLUSH_TASKS_EVERY_TIME = 5;
private static final int MAX_TASKS_PERMIT = 5000;
private Config config;
static {
OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
}
public MRJobParser(MRRunningJobConfig.EndpointConfig endpointConfig,
MRRunningJobConfig.EagleServiceConfig eagleServiceConfig,
AppInfo app, Map<String, JobExecutionAPIEntity> mrJobMap,
MRRunningJobManager runningJobManager,
List<String> configKeys,
Config config) {
this.app = app;
if (mrJobMap == null) {
this.mrJobEntityMap = new HashMap<>();
} else {
this.mrJobEntityMap = mrJobMap;
}
this.mrJobConfigs = new HashMap<>();
this.mrJobEntityCreationHandler = new MRJobEntityCreationHandler(eagleServiceConfig);
this.commonTags.put(MRJobTagName.SITE.toString(), endpointConfig.site);
this.commonTags.put(MRJobTagName.USER.toString(), app.getUser());
this.commonTags.put(MRJobTagName.JOB_QUEUE.toString(), app.getQueue());
this.runningJobManager = runningJobManager;
this.parserStatus = ParserStatus.FINISHED;
this.finishedTaskIds = new HashSet<>();
this.configKeys = configKeys;
this.config = config;
}
public void setAppInfo(AppInfo app) {
this.app = app;
}
public ParserStatus status() {
return this.parserStatus;
}
public void setStatus(ParserStatus status) {
this.parserStatus = status;
}
private void finishMRJob(String mrJobId) {
JobExecutionAPIEntity jobExecutionAPIEntity = mrJobEntityMap.get(mrJobId);
jobExecutionAPIEntity.setInternalState(Constants.AppState.FINISHED.toString());
jobExecutionAPIEntity.setCurrentState(Constants.AppState.RUNNING.toString());
// set an estimated job finished time because it's hard the get the specific one
jobExecutionAPIEntity.setEndTime(System.currentTimeMillis());
mrJobConfigs.remove(mrJobId);
if (mrJobConfigs.size() == 0) {
this.parserStatus = ParserStatus.APP_FINISHED;
}
LOG.info("mr job {} has been finished", mrJobId);
}
private void fetchMRRunningInfo() throws Exception {
for (int i = 0; i < MAX_RETRY_TIMES; i++) {
if (fetchMRJobs()) {
break;
} else if (i >= MAX_RETRY_TIMES - 1) {
if (app.getState().equals(Constants.AppState.FINISHED.toString())) {
mrJobEntityMap.keySet().forEach(this::finishMRJob);
}
return;
}
}
List<Function<String, Boolean>> functions = new ArrayList<>();
functions.add(fetchJobConfig);
functions.add(fetchJobCounters);
if ((int) (Math.random() * 10) % FLUSH_TASKS_EVERY_TIME == 0) {
functions.add(fetchTasks);
}
for (String jobId : mrJobEntityMap.keySet()) {
for (Function<String, Boolean> function : functions) {
int i = 0;
for (; i < MAX_RETRY_TIMES; i++) {
if (function.apply(jobId)) {
break;
}
}
if (i >= MAX_RETRY_TIMES) {
break;
}
}
}
}
private boolean fetchMRJobs() {
String jobURL = app.getTrackingUrl() + Constants.MR_JOBS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
InputStream is = null;
List<MRJob> mrJobs = null;
try {
is = InputStreamUtils.getInputStream(jobURL, null, Constants.CompressionType.NONE);
LOG.info("fetch mr job from {}", jobURL);
mrJobs = OBJ_MAPPER.readValue(is, MRJobsWrapper.class).getJobs().getJob();
} catch (Exception e) {
LOG.warn("fetch mr job from {} failed, {}", jobURL, e.getMessage());
return false;
} finally {
Utils.closeInputStream(is);
}
for (MRJob mrJob : mrJobs) {
String id = mrJob.getId();
if (!mrJobEntityMap.containsKey(id)) {
mrJobEntityMap.put(id, new JobExecutionAPIEntity());
}
JobExecutionAPIEntity jobExecutionAPIEntity = mrJobEntityMap.get(id);
jobExecutionAPIEntity.setTags(new HashMap<>(commonTags));
jobExecutionAPIEntity.getTags().put(MRJobTagName.JOB_ID.toString(), id);
jobExecutionAPIEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), mrJob.getName());
String jobDefId = JobNameNormalization.getInstance(this.config).normalize(mrJob.getName());
jobExecutionAPIEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), jobDefId);
if (mrJobConfigs.get(id) != null) {
JobConfig jobConfig = mrJobConfigs.get(id);
if (jobConfig.containsKey(this.configKeys.get(0))) {
jobExecutionAPIEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), jobConfig.get(this.configKeys.get(0)));
}
}
jobExecutionAPIEntity.setTimestamp(app.getStartedTime());
jobExecutionAPIEntity.setSubmissionTime(app.getStartedTime());
jobExecutionAPIEntity.setTrackingUrl(app.getTrackingUrl());
jobExecutionAPIEntity.setStartTime(mrJob.getStartTime());
jobExecutionAPIEntity.setDurationTime(mrJob.getElapsedTime());
jobExecutionAPIEntity.setCurrentState(mrJob.getState());
jobExecutionAPIEntity.setInternalState(mrJob.getState());
jobExecutionAPIEntity.setNumTotalMaps(mrJob.getMapsTotal());
jobExecutionAPIEntity.setNumFinishedMaps(mrJob.getMapsCompleted());
jobExecutionAPIEntity.setNumTotalReduces(mrJob.getReducesTotal());
jobExecutionAPIEntity.setNumFinishedReduces(mrJob.getReducesCompleted());
jobExecutionAPIEntity.setMapProgress(mrJob.getMapProgress());
jobExecutionAPIEntity.setReduceProgress(mrJob.getReduceProgress());
jobExecutionAPIEntity.setMapsPending(mrJob.getMapsPending());
jobExecutionAPIEntity.setMapsRunning(mrJob.getMapsRunning());
jobExecutionAPIEntity.setReducesPending(mrJob.getReducesPending());
jobExecutionAPIEntity.setReducesRunning(mrJob.getReducesRunning());
jobExecutionAPIEntity.setNewReduceAttempts(mrJob.getNewReduceAttempts());
jobExecutionAPIEntity.setRunningReduceAttempts(mrJob.getRunningReduceAttempts());
jobExecutionAPIEntity.setFailedReduceAttempts(mrJob.getFailedReduceAttempts());
jobExecutionAPIEntity.setKilledReduceAttempts(mrJob.getKilledReduceAttempts());
jobExecutionAPIEntity.setSuccessfulReduceAttempts(mrJob.getSuccessfulReduceAttempts());
jobExecutionAPIEntity.setNewMapAttempts(mrJob.getNewMapAttempts());
jobExecutionAPIEntity.setRunningMapAttempts(mrJob.getRunningMapAttempts());
jobExecutionAPIEntity.setFailedMapAttempts(mrJob.getFailedMapAttempts());
jobExecutionAPIEntity.setKilledMapAttempts(mrJob.getKilledMapAttempts());
jobExecutionAPIEntity.setSuccessfulMapAttempts(mrJob.getSuccessfulMapAttempts());
jobExecutionAPIEntity.setAppInfo(app);
jobExecutionAPIEntity.setAllocatedMB(app.getAllocatedMB());
jobExecutionAPIEntity.setAllocatedVCores(app.getAllocatedVCores());
jobExecutionAPIEntity.setRunningContainers(app.getRunningContainers());
}
return true;
}
private Function<String, Boolean> fetchJobCounters = jobId -> {
String jobCounterURL = app.getTrackingUrl() + Constants.MR_JOBS_URL + "/" + jobId + "/" + Constants.MR_JOB_COUNTERS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
InputStream is = null;
JobCounters jobCounters = null;
try {
is = InputStreamUtils.getInputStream(jobCounterURL, null, Constants.CompressionType.NONE);
LOG.info("fetch mr job counter from {}", jobCounterURL);
jobCounters = OBJ_MAPPER.readValue(is, JobCountersWrapper.class).getJobCounters();
} catch (Exception e) {
LOG.warn("fail to fetch mr job counter from {}, {}", jobCounterURL, e.getMessage());
return false;
} finally {
Utils.closeInputStream(is);
}
if (jobCounters.getCounterGroup() == null) {
return true;
}
JobExecutionAPIEntity jobExecutionAPIEntity = mrJobEntityMap.get(jobId);
org.apache.eagle.jpm.util.jobcounter.JobCounters jobCounter = new org.apache.eagle.jpm.util.jobcounter.JobCounters();
Map<String, Map<String, Long>> groups = new HashMap<>();
for (JobCounterGroup jobCounterGroup : jobCounters.getCounterGroup()) {
String counterGroupName = jobCounterGroup.getCounterGroupName();
if (!groups.containsKey(counterGroupName)) {
groups.put(counterGroupName, new HashMap<>());
}
Map<String, Long> counterValues = groups.get(counterGroupName);
List<JobCounterItem> items = jobCounterGroup.getCounter();
if (items == null) {
continue;
}
for (JobCounterItem item : items) {
String key = item.getName();
counterValues.put(key, item.getTotalCounterValue());
if (counterGroupName.equals(Constants.JOB_COUNTER)) {
if (key.equals(Constants.JobCounter.DATA_LOCAL_MAPS.toString())) {
jobExecutionAPIEntity.setDataLocalMaps((int) item.getTotalCounterValue());
} else if (key.equals(Constants.JobCounter.RACK_LOCAL_MAPS.toString())) {
jobExecutionAPIEntity.setRackLocalMaps((int) item.getTotalCounterValue());
} else if (key.equals(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString())) {
jobExecutionAPIEntity.setTotalLaunchedMaps((int) item.getTotalCounterValue());
}
}
}
}
jobCounter.setCounters(groups);
jobExecutionAPIEntity.setJobCounters(jobCounter);
if (jobExecutionAPIEntity.getTotalLaunchedMaps() > 0) {
jobExecutionAPIEntity.setDataLocalMapsPercentage(jobExecutionAPIEntity.getDataLocalMaps() * 1.0 / jobExecutionAPIEntity.getTotalLaunchedMaps());
jobExecutionAPIEntity.setRackLocalMapsPercentage(jobExecutionAPIEntity.getRackLocalMaps() * 1.0 / jobExecutionAPIEntity.getTotalLaunchedMaps());
}
return true;
};
private Function<Pair<String, String>, org.apache.eagle.jpm.util.jobcounter.JobCounters> fetchTaskCounters = jobAndTaskId -> {
org.apache.eagle.jpm.util.jobcounter.JobCounters jobCounter = new org.apache.eagle.jpm.util.jobcounter.JobCounters();
String jobId = jobAndTaskId.getLeft();
String taskId = jobAndTaskId.getRight();
String taskCounterURL = app.getTrackingUrl()
+ Constants.MR_JOBS_URL + "/"
+ jobId + "/" + Constants.MR_TASKS_URL + "/"
+ taskId + "/" + Constants.MR_JOB_COUNTERS_URL
+ "?" + Constants.ANONYMOUS_PARAMETER;
InputStream is = null;
TaskCounters taskCounters = null;
try {
is = InputStreamUtils.getInputStream(taskCounterURL, null, Constants.CompressionType.NONE);
LOG.info("fetch mr task counters from {}", taskCounterURL);
taskCounters = OBJ_MAPPER.readValue(is, TaskCountersWrapper.class).getJobTaskCounters();
} catch (Exception e) {
LOG.warn("fetch mr task counters from {} failed, {}", taskCounterURL, e);
return null;
} finally {
Utils.closeInputStream(is);
}
if (taskCounters.getTaskCounterGroup() == null) {
return jobCounter;
}
Map<String, Map<String, Long>> groups = new HashMap<>();
for (TaskCounterGroup taskCounterGroup : taskCounters.getTaskCounterGroup()) {
if (!groups.containsKey(taskCounterGroup.getCounterGroupName())) {
groups.put(taskCounterGroup.getCounterGroupName(), new HashMap<>());
}
Map<String, Long> counterValues = groups.get(taskCounterGroup.getCounterGroupName());
List<TaskCounterItem> items = taskCounterGroup.getCounter();
if (items == null) {
continue;
}
for (TaskCounterItem item : items) {
counterValues.put(item.getName(), item.getValue());
}
}
jobCounter.setCounters(groups);
return jobCounter;
};
private Function<Pair<String, String>, TaskAttemptExecutionAPIEntity> fetchTaskAttempt = jobAndTaskId -> {
String jobId = jobAndTaskId.getLeft();
String taskId = jobAndTaskId.getRight();
String taskAttemptURL = app.getTrackingUrl()
+ Constants.MR_JOBS_URL + "/"
+ jobId + "/" + Constants.MR_TASKS_URL + "/"
+ taskId + "/" + Constants.MR_TASK_ATTEMPTS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
InputStream is = null;
List<MRTaskAttempt> taskAttempts = null;
try {
is = InputStreamUtils.getInputStream(taskAttemptURL, null, Constants.CompressionType.GZIP);
LOG.info("fetch mr task attempt from {}", taskAttemptURL);
taskAttempts = OBJ_MAPPER.readValue(is, MRTaskAttemptWrapper.class).getTaskAttempts().getTaskAttempt();
} catch (Exception e) {
LOG.warn("fetch mr task attempt from {} failed, {}", taskAttemptURL, e);
return null;
} finally {
Utils.closeInputStream(is);
}
Comparator<MRTaskAttempt> byStartTime = (e1, e2) -> -1 * Long.compare(e1.getStartTime(), e2.getStartTime());
Iterator<MRTaskAttempt> taskAttemptIterator = taskAttempts.stream().sorted(byStartTime).iterator();
while (taskAttemptIterator.hasNext()) {
MRTaskAttempt mrTaskAttempt = taskAttemptIterator.next();
TaskAttemptExecutionAPIEntity taskAttemptExecutionAPIEntity = new TaskAttemptExecutionAPIEntity();
taskAttemptExecutionAPIEntity.setTags(new HashMap<>(mrJobEntityMap.get(jobId).getTags()));
taskAttemptExecutionAPIEntity.getTags().put(MRJobTagName.TASK_TYPE.toString(), mrTaskAttempt.getType());
taskAttemptExecutionAPIEntity.getTags().put(MRJobTagName.TASK_ID.toString(), taskId);
taskAttemptExecutionAPIEntity.getTags().put(MRJobTagName.RACK.toString(), mrTaskAttempt.getRack());
String nodeHttpAddress = mrTaskAttempt.getNodeHttpAddress();
String host = nodeHttpAddress.substring(0, nodeHttpAddress.indexOf(':'));
taskAttemptExecutionAPIEntity.getTags().put(MRJobTagName.HOSTNAME.toString(), host);
taskAttemptExecutionAPIEntity.setTimestamp(mrTaskAttempt.getStartTime());
taskAttemptExecutionAPIEntity.setStartTime(mrTaskAttempt.getStartTime());
taskAttemptExecutionAPIEntity.setFinishTime(mrTaskAttempt.getFinishTime());
taskAttemptExecutionAPIEntity.setElapsedTime(mrTaskAttempt.getElapsedTime());
taskAttemptExecutionAPIEntity.setProgress(mrTaskAttempt.getProgress());
taskAttemptExecutionAPIEntity.setId(mrTaskAttempt.getId());
taskAttemptExecutionAPIEntity.setStatus(mrTaskAttempt.getState());
taskAttemptExecutionAPIEntity.setDiagnostics(mrTaskAttempt.getDiagnostics());
taskAttemptExecutionAPIEntity.setType(mrTaskAttempt.getType());
taskAttemptExecutionAPIEntity.setAssignedContainerId(mrTaskAttempt.getAssignedContainerId());
this.mrJobEntityCreationHandler.add(taskAttemptExecutionAPIEntity);
return taskAttemptExecutionAPIEntity;
}
return null;
};
private void needFetchAttemptTasks(Iterator<MRTask> taskIterator, Set<String> needFetchAttemptTasks) {
int i = 0;
while (taskIterator.hasNext() && i < TOP_BOTTOM_TASKS_BY_ELAPSED_TIME) {
MRTask mrTask = taskIterator.next();
if (mrTask.getElapsedTime() > 0) {
i++;
needFetchAttemptTasks.add(mrTask.getId());
}
}
}
private Set<String> calcFetchCounterAndAttemptTaskId(List<MRTask> tasks) {
Set<String> needFetchAttemptTasks = new HashSet<>();
//1, sort by elapsedTime
Comparator<MRTask> byElapsedTimeIncrease = (e1, e2) -> Long.compare(e1.getElapsedTime(), e2.getElapsedTime());
Comparator<MRTask> byElapsedTimeDecrease = (e1, e2) -> -1 * Long.compare(e1.getElapsedTime(), e2.getElapsedTime());
//2, get finished bottom n
Iterator<MRTask> taskIteratorIncrease = tasks.stream()
.filter(task -> task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
.sorted(byElapsedTimeIncrease).iterator();
needFetchAttemptTasks(taskIteratorIncrease, needFetchAttemptTasks);
//3, fetch finished top n
Iterator<MRTask> taskIteratorDecrease = tasks.stream()
.filter(task -> task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
.sorted(byElapsedTimeDecrease).iterator();
needFetchAttemptTasks(taskIteratorDecrease, needFetchAttemptTasks);
//4, fetch running top n
taskIteratorDecrease = tasks.stream()
.filter(task -> task.getState().equals(Constants.TaskState.RUNNING.toString()))
.sorted(byElapsedTimeDecrease).iterator();
needFetchAttemptTasks(taskIteratorDecrease, needFetchAttemptTasks);
return needFetchAttemptTasks;
}
private Function<String, Boolean> fetchTasks = jobId -> {
try {
JobExecutionAPIEntity entity = this.mrJobEntityMap.get(jobId);
int taskNumber = entity.getNumTotalMaps() + entity.getNumTotalReduces();
if (taskNumber > MAX_TASKS_PERMIT) {
LOG.info("too many tasks {}, ignore tasks", taskNumber);
return true;
}
} catch (Exception e) {
return true;
}
String taskURL = app.getTrackingUrl() + Constants.MR_JOBS_URL + "/" + jobId + "/" + Constants.MR_TASKS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
InputStream is = null;
List<MRTask> tasks = null;
try {
is = InputStreamUtils.getInputStream(taskURL, null, Constants.CompressionType.NONE);
LOG.info("fetch mr task from {}", taskURL);
tasks = OBJ_MAPPER.readValue(is, MRTasksWrapper.class).getTasks().getTask();
} catch (Exception e) {
LOG.warn("fetch mr task from {} failed, {}", taskURL, e);
return false;
} finally {
Utils.closeInputStream(is);
}
Set<String> needFetchAttemptTasks = new HashSet<>();//calcFetchCounterAndAttemptTaskId(tasks);
for (MRTask task : tasks) {
if (this.finishedTaskIds.contains(task.getId()) && !needFetchAttemptTasks.contains(task.getId())) {
continue;
}
TaskExecutionAPIEntity taskExecutionAPIEntity = new TaskExecutionAPIEntity();
taskExecutionAPIEntity.setTags(new HashMap<>(mrJobEntityMap.get(jobId).getTags()));
taskExecutionAPIEntity.getTags().put(MRJobTagName.TASK_TYPE.toString(), task.getType());
taskExecutionAPIEntity.getTags().put(MRJobTagName.TASK_ID.toString(), task.getId());
taskExecutionAPIEntity.setTimestamp(app.getStartedTime());
taskExecutionAPIEntity.setStartTime(task.getStartTime());
taskExecutionAPIEntity.setEndTime(task.getFinishTime());
taskExecutionAPIEntity.setDuration(task.getElapsedTime());
taskExecutionAPIEntity.setProgress(task.getProgress());
taskExecutionAPIEntity.setTaskStatus(task.getState());
taskExecutionAPIEntity.setSuccessfulAttempt(task.getSuccessfulAttempt());
taskExecutionAPIEntity.setStatusDesc(task.getStatus());
if (needFetchAttemptTasks.contains(task.getId())) {
taskExecutionAPIEntity.setJobCounters(fetchTaskCounters.apply(Pair.of(jobId, task.getId())));
if (taskExecutionAPIEntity.getJobCounters() == null) {
return false;
}
TaskAttemptExecutionAPIEntity taskAttemptExecutionAPIEntity = fetchTaskAttempt.apply(Pair.of(jobId, task.getId()));
if (taskAttemptExecutionAPIEntity != null) {
taskExecutionAPIEntity.getTags().put(MRJobTagName.HOSTNAME.toString(), taskAttemptExecutionAPIEntity.getTags().get(MRJobTagName.HOSTNAME.toString()));
//taskExecutionAPIEntity.setHost(taskAttemptExecutionAPIEntity.getTags().get(MRJobTagName.HOSTNAME.toString()));
}
}
mrJobEntityCreationHandler.add(taskExecutionAPIEntity);
if (task.getState().equals(Constants.TaskState.SUCCEEDED.toString())
|| task.getState().equals(Constants.TaskState.FAILED.toString())
|| task.getState().equals(Constants.TaskState.KILLED.toString())
|| task.getState().equals(Constants.TaskState.KILL_WAIT.toString())) {
//LOG.info("mr job {} task {} has finished", jobId, task.getId());
this.finishedTaskIds.add(task.getId());
}
}
return true;
};
private Function<String, Boolean> fetchJobConfig = jobId -> {
if (mrJobConfigs.containsKey(jobId)) {
mrJobEntityMap.get(jobId).setJobConfig(mrJobConfigs.get(jobId));
mrJobEntityMap.get(jobId).getTags().put(MRJobTagName.JOB_TYPE.toString(), Utils.fetchJobType(mrJobConfigs.get(jobId)).toString());
return true;
}
String confURL = app.getTrackingUrl() + Constants.MR_JOBS_URL + "/" + jobId + "/" + Constants.MR_CONF_URL + "?" + Constants.ANONYMOUS_PARAMETER;
InputStream is = null;
try {
LOG.info("fetch job conf from {}", confURL);
final URLConnection connection = URLConnectionUtils.getConnection(confURL);
connection.setRequestProperty(XML_HTTP_HEADER, XML_FORMAT);
connection.setConnectTimeout(CONNECTION_TIMEOUT);
connection.setReadTimeout(READ_TIMEOUT);
is = connection.getInputStream();
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
dbf.setFeature(XMLConstants.FEATURE_SECURE_PROCESSING, Boolean.TRUE);
dbf.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
DocumentBuilder db = dbf.newDocumentBuilder();
Document dt = db.parse(is);
Element element = dt.getDocumentElement();
JobConfig config = new JobConfig();
NodeList propertyList = element.getElementsByTagName("property");
int length = propertyList.getLength();
for (int i = 0; i < length; i++) {
Node property = propertyList.item(i);
String key = property.getChildNodes().item(0).getTextContent();
String value = property.getChildNodes().item(1).getTextContent();
if (this.configKeys.contains(key)) {
config.put(key, value);
}
if (!this.configKeys.isEmpty() && key.equals(this.configKeys.get(0))) {
mrJobEntityMap.get(jobId).getTags().put(MRJobTagName.JOD_DEF_ID.toString(), value);
}
}
mrJobEntityMap.get(jobId).getTags().put(MRJobTagName.JOB_TYPE.toString(), Utils.fetchJobType(config).toString());
mrJobEntityMap.get(jobId).setJobConfig(config);
mrJobConfigs.put(jobId, config);
mrJobEntityCreationHandler.add(mrJobEntityMap.get(jobId));
runningJobManager.update(app.getId(), jobId, mrJobEntityMap.get(jobId));
} catch (Exception e) {
LOG.warn("fetch job conf from {} failed, {}", confURL, e);
return false;
} finally {
Utils.closeInputStream(is);
}
return true;
};
@Override
public void run() {
synchronized (this.lock) {
if (this.parserStatus == ParserStatus.APP_FINISHED) {
return;
}
this.parserStatus = ParserStatus.RUNNING;
LOG.info("start to process yarn application " + app.getId());
try {
fetchMRRunningInfo();
} catch (Exception e) {
LOG.warn("exception found when process application {}, {}", app.getId(), e);
} finally {
for (String jobId : mrJobEntityMap.keySet()) {
JobExecutionAPIEntity entity = mrJobEntityMap.get(jobId);
if (entity.getTags().containsKey(MRJobTagName.JOB_TYPE.toString())) {
mrJobEntityCreationHandler.add(entity);
}
}
if (mrJobEntityCreationHandler.flush()) { //force flush
//we must flush entities before delete from zk in case of missing finish state of jobs
//delete from zk if needed
mrJobEntityMap.keySet()
.stream()
.filter(jobId -> mrJobEntityMap.get(jobId).getInternalState() != null)
.filter(
jobId -> mrJobEntityMap.get(jobId).getInternalState().equals(Constants.AppState.FINISHED.toString())
|| mrJobEntityMap.get(jobId).getInternalState().equals(Constants.AppState.FAILED.toString()))
.forEach(
jobId -> this.runningJobManager.delete(app.getId(), jobId));
}
LOG.info("finish process yarn application " + app.getId());
}
if (this.parserStatus == ParserStatus.RUNNING) {
this.parserStatus = ParserStatus.FINISHED;
}
}
}
private AnalyzerEntity convertToAnalysisEntity(JobExecutionAPIEntity jobExecutionAPIEntity) {
AnalyzerEntity mrJobAnalysisEntity = new AnalyzerEntity();
Map<String, String> tags = jobExecutionAPIEntity.getTags();
mrJobAnalysisEntity.setJobDefId(tags.get(MRJobTagName.JOD_DEF_ID.toString()));
mrJobAnalysisEntity.setJobId(tags.get(MRJobTagName.JOB_ID.toString()));
mrJobAnalysisEntity.setSiteId(tags.get(MRJobTagName.SITE.toString()));
mrJobAnalysisEntity.setUserId(tags.get(MRJobTagName.USER.toString()));
mrJobAnalysisEntity.setStartTime(jobExecutionAPIEntity.getStartTime());
mrJobAnalysisEntity.setEndTime(jobExecutionAPIEntity.getEndTime());
mrJobAnalysisEntity.setDurationTime(jobExecutionAPIEntity.getDurationTime());
mrJobAnalysisEntity.setCurrentState(jobExecutionAPIEntity.getInternalState());
mrJobAnalysisEntity.setJobConfig(new HashMap<>(jobExecutionAPIEntity.getJobConfig()));
mrJobAnalysisEntity.setProgress(this.app.getProgress());
return mrJobAnalysisEntity;
}
}