blob: fed56117ccae40b7babc077df2b152f44e7ade0d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.security.hive.jobrunning;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import org.apache.commons.lang.StringUtils;
import org.apache.eagle.dataproc.impl.storm.ValuesArray;
import org.apache.eagle.jpm.util.*;
import org.apache.eagle.jpm.util.jobrecover.RunningJobManager;
import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
import org.apache.eagle.jpm.util.resourcefetch.connection.URLConnectionUtils;
import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
import org.apache.eagle.jpm.util.resourcefetch.model.MRJob;
import org.apache.eagle.jpm.util.resourcefetch.model.MRJobsWrapper;
import org.apache.eagle.security.hive.config.RunningJobCrawlConfig;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.jsoup.Jsoup;
import org.jsoup.nodes.TextNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import java.io.InputStream;
import java.net.URLConnection;
import java.util.*;
public class HiveJobFetchSpout extends BaseRichSpout {
private static final Logger LOG = LoggerFactory.getLogger(HiveJobFetchSpout.class);
private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
private RunningJobCrawlConfig crawlConfig;
private SpoutOutputCollector collector;
private JobIdFilter jobFilter;
private RMResourceFetcher rmResourceFetcher;
private static final String XML_HTTP_HEADER = "Accept";
private static final String XML_FORMAT = "application/xml";
private static final int CONNECTION_TIMEOUT = 10000;
private static final int READ_TIMEOUT = 10000;
private Long lastFinishAppTime;
private RunningJobManager runningJobManager;
private int partitionId;
static {
OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
}
public HiveJobFetchSpout(RunningJobCrawlConfig crawlConfig) {
this.crawlConfig = crawlConfig;
}
private int calculatePartitionId(TopologyContext context) {
int thisGlobalTaskId = context.getThisTaskId();
String componentName = context.getComponentId(thisGlobalTaskId);
List<Integer> globalTaskIds = context.getComponentTasks(componentName);
int index = 0;
for (Integer id : globalTaskIds) {
if (id == thisGlobalTaskId) {
return index;
}
index++;
}
throw new IllegalStateException();
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.rmResourceFetcher = new RMResourceFetcher(crawlConfig.endPointConfig.RMBasePaths);
this.partitionId = calculatePartitionId(context);
// sanity verify 0<=partitionId<=numTotalPartitions-1
if (partitionId < 0 || partitionId > crawlConfig.controlConfig.numTotalPartitions) {
throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId "
+ partitionId + " and numTotalPartitions " + crawlConfig.controlConfig.numTotalPartitions);
}
Class<? extends JobIdPartitioner> partitionerCls = crawlConfig.controlConfig.partitionerCls;
try {
this.jobFilter = new JobIdFilterByPartition(partitionerCls.newInstance(),
crawlConfig.controlConfig.numTotalPartitions, partitionId);
} catch (Exception e) {
LOG.error("failing instantiating job partitioner class " + partitionerCls.getCanonicalName());
throw new IllegalStateException(e);
}
this.collector = collector;
this.runningJobManager = new RunningJobManager(crawlConfig.zkStateConfig.zkQuorum,
crawlConfig.zkStateConfig.zkSessionTimeoutMs,
crawlConfig.zkStateConfig.zkRetryTimes,
crawlConfig.zkStateConfig.zkRetryInterval,
crawlConfig.zkStateConfig.zkRoot,
crawlConfig.zkStateConfig.zkLockPath);
this.lastFinishAppTime = this.runningJobManager.recoverLastFinishedTime(partitionId);
if (this.lastFinishAppTime == 0L) {
this.lastFinishAppTime = Calendar.getInstance().getTimeInMillis() - 24 * 60 * 60000L;//one day ago
this.runningJobManager.updateLastFinishTime(partitionId, this.lastFinishAppTime);
}
}
@Override
public void nextTuple() {
LOG.info("start to fetch job list");
try {
List<AppInfo> apps = rmResourceFetcher.getResource(Constants.ResourceType.RUNNING_MR_JOB);
if (apps == null) {
apps = new ArrayList<>();
}
handleApps(apps, true);
long fetchTime = Calendar.getInstance().getTimeInMillis();
if (fetchTime - this.lastFinishAppTime > 60000L) {
apps = rmResourceFetcher.getResource(Constants.ResourceType.COMPLETE_MR_JOB, Long.toString(this.lastFinishAppTime));
if (apps == null) {
apps = new ArrayList<>();
}
handleApps(apps, false);
this.lastFinishAppTime = fetchTime;
this.runningJobManager.updateLastFinishTime(partitionId, fetchTime);
}
} catch (Exception e) {
LOG.warn("exception found {}", e);
} finally {
//need to be configured
Utils.sleep(60);
}
}
private void handleApps(List<AppInfo> apps, boolean isRunning) {
List<MRJob> mrJobs = new ArrayList<>();
//fetch job config
if (isRunning) {
for (AppInfo appInfo : apps) {
if (!jobFilter.accept(appInfo.getId())) {
continue;
}
String jobURL = appInfo.getTrackingUrl() + Constants.MR_JOBS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
InputStream is = null;
try {
is = InputStreamUtils.getInputStream(jobURL, null, Constants.CompressionType.NONE);
LOG.info("fetch mr job from {}", jobURL);
mrJobs = OBJ_MAPPER.readValue(is, MRJobsWrapper.class).getJobs().getJob();
} catch (Exception e) {
LOG.warn("fetch mr job from {} failed, {}", jobURL, e);
continue;
} finally {
Utils.closeInputStream(is);
}
if (fetchRunningConfig(appInfo, mrJobs)) {
continue;
}
}
}
if (!isRunning) {
for (AppInfo appInfo : apps) {
if (!jobFilter.accept(appInfo.getId())) {
continue;
}
MRJob mrJob = new MRJob();
mrJob.setId(appInfo.getId().replace("application_", "job_"));
mrJobs.add(mrJob);
fetchFinishedConfig(appInfo, mrJobs);
}
}
}
private boolean fetchRunningConfig(AppInfo appInfo, List<MRJob> mrJobs) {
InputStream is = null;
for (MRJob mrJob : mrJobs) {
String confURL = appInfo.getTrackingUrl() + Constants.MR_JOBS_URL + "/" + mrJob.getId() + "/" + Constants.MR_CONF_URL + "?" + Constants.ANONYMOUS_PARAMETER;
try {
LOG.info("fetch job conf from {}", confURL);
final URLConnection connection = URLConnectionUtils.getConnection(confURL);
connection.setRequestProperty(XML_HTTP_HEADER, XML_FORMAT);
connection.setConnectTimeout(CONNECTION_TIMEOUT);
connection.setReadTimeout(READ_TIMEOUT);
is = connection.getInputStream();
Map<String, String> hiveQueryLog = new HashMap<>();
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
dbf.setFeature(XMLConstants.FEATURE_SECURE_PROCESSING, Boolean.TRUE);
DocumentBuilder db = dbf.newDocumentBuilder();
Document dt = db.parse(is);
Element element = dt.getDocumentElement();
NodeList propertyList = element.getElementsByTagName("property");
int length = propertyList.getLength();
for (int i = 0; i < length; i++) {
Node property = propertyList.item(i);
String key = property.getChildNodes().item(0).getTextContent();
String value = property.getChildNodes().item(1).getTextContent();
hiveQueryLog.put(key, value);
}
if (hiveQueryLog.containsKey(Constants.HIVE_QUERY_STRING)) {
collector.emit(new ValuesArray(appInfo.getUser(), mrJob.getId(), Constants.ResourceType.JOB_CONFIGURATION, hiveQueryLog), mrJob.getId());
}
} catch (Exception e) {
LOG.warn("fetch job conf from {} failed, {}", confURL, e);
e.printStackTrace();
return false;
} finally {
Utils.closeInputStream(is);
}
}
return true;
}
private boolean fetchFinishedConfig(AppInfo appInfo, List<MRJob> mrJobs) {
InputStream is = null;
for (MRJob mrJob : mrJobs) {
String urlString = crawlConfig.endPointConfig.HSBasePath + "jobhistory/conf/" + mrJob.getId() + "?" + Constants.ANONYMOUS_PARAMETER;
try {
LOG.info("fetch job conf from {}", urlString);
is = InputStreamUtils.getInputStream(urlString, null, Constants.CompressionType.NONE);
final org.jsoup.nodes.Document doc = Jsoup.parse(is, "UTF-8", urlString);
doc.outputSettings().prettyPrint(false);
org.jsoup.select.Elements elements = doc.select("table[id=conf]").select("tbody").select("tr");
Map<String, String> hiveQueryLog = new HashMap<>();
Iterator<org.jsoup.nodes.Element> iter = elements.iterator();
while (iter.hasNext()) {
org.jsoup.nodes.Element element = iter.next();
org.jsoup.select.Elements tds = element.children();
String key = tds.get(0).text();
String value = "";
org.jsoup.nodes.Element valueElement = tds.get(1);
if (Constants.HIVE_QUERY_STRING.equals(key)) {
for (org.jsoup.nodes.Node child : valueElement.childNodes()) {
if (child instanceof TextNode) {
TextNode valueTextNode = (TextNode) child;
value = valueTextNode.getWholeText();
value = StringUtils.strip(value);
}
}
} else {
value = valueElement.text();
}
hiveQueryLog.put(key, value);
}
if (hiveQueryLog.containsKey(Constants.HIVE_QUERY_STRING)) {
collector.emit(new ValuesArray(appInfo.getUser(), mrJob.getId(), Constants.ResourceType.JOB_CONFIGURATION, hiveQueryLog), mrJob.getId());
}
} catch (Exception e) {
LOG.warn("fetch job conf from {} failed, {}", urlString, e);
e.printStackTrace();
return false;
} finally {
Utils.closeInputStream(is);
}
}
return true;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("user", "jobId", "type", "config"));
}
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
//process fail over later
}
}