blob: 9cb2510c45a35fc113fd6fe8d98a6ca5cb42ebb4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.jpm.mr.running.storm;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.apache.eagle.common.DateTimeUtil;
import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
import org.apache.eagle.jpm.mr.runningentity.AppStreamInfo;
import org.apache.eagle.jpm.mr.runningentity.YarnAppAPIEntity;
import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
import org.apache.eagle.jpm.util.resourcefetch.url.URLUtil;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.GenericMetricEntity;
import org.apache.eagle.service.client.IEagleServiceClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.eagle.jpm.mr.runningentity.AppStreamInfo.convertAppToStream;
public class MRRunningAppMetricBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(MRRunningAppMetricBolt.class);
private MRRunningJobConfig config;
private IEagleServiceClient client;
private RMResourceFetcher fetcher;
private OutputCollector collector;
private String site;
private static final long AGGREGATE_INTERVAL = DateTimeUtil.ONEMINUTE;
private static final String USER_TAG = "user";
private static final String QUEUE_TAG = "queue";
private static final String SITE_TAG = "site";
@SuppressWarnings("serial")
public static HashMap<String, String> metrics = new HashMap<String, String>() {
{
put(Constants.MetricName.HADOOP_APPS_ALLOCATED_MB, "getAllocatedMB");
put(Constants.MetricName.HADOOP_APPS_ALLOCATED_VCORES, "getAllocatedVCores");
put(Constants.MetricName.HADOOP_APPS_RUNNING_CONTAINERS, "getRunningContainers");
}
};
public MRRunningAppMetricBolt(MRRunningJobConfig config) {
this.config = config;
this.site = config.getConfig().getString("siteId");
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.client = new EagleServiceClientImpl(config.getConfig());
this.fetcher = new RMResourceFetcher(config.getEndpointConfig().rmUrls);
}
@Override
public void execute(Tuple input) {
List<AppInfo> runningApps = (List<AppInfo>) input.getValue(0);
if (runningApps == null || runningApps.isEmpty()) {
LOG.warn("App list is empty");
}
try {
Map<String, GenericMetricEntity> appMetrics = parseRunningAppMetrics(runningApps);
List<YarnAppAPIEntity> acceptedApps = parseAcceptedApp();
flush(appMetrics, acceptedApps);
} catch (Exception e) {
LOG.error("Fetal error is caught {}", e.getMessage(), e);
}
}
private void createMetric(Map<String, GenericMetricEntity> appMetricEntities,
long timestamp, Map<String, String> tags, String metricName, int value) {
String key = metricName + tags.toString() + " " + timestamp;
GenericMetricEntity entity = appMetricEntities.get(key);
if (entity == null) {
entity = new GenericMetricEntity();
entity.setTags(tags);
entity.setTimestamp(timestamp);
entity.setPrefix(metricName);
entity.setValue(new double[] {0.0});
appMetricEntities.put(key, entity);
}
double lastValue = entity.getValue()[0];
entity.setValue(new double[] {lastValue + value});
}
private Map<String, String> generateMetricTags(AggLevel level, AppInfo app) {
Map<String, String> tags = new HashMap<>();
tags.put(SITE_TAG, site);
switch (level) {
case CLUSTER : break;
case QUEUE :
tags.put(QUEUE_TAG, app.getQueue());
break;
case USER :
tags.put(USER_TAG, app.getUser());
break;
default :
LOG.warn("Unsupported Aggregation Level {}", level);
}
return tags;
}
public Map<String, GenericMetricEntity> parseRunningAppMetrics(List<AppInfo> runningApps) throws Exception {
long timestamp = System.currentTimeMillis() / AGGREGATE_INTERVAL * AGGREGATE_INTERVAL;
Map<String, GenericMetricEntity> appMetricEntities = new HashMap<>();
for (AppInfo app : runningApps) {
for (AggLevel level : AggLevel.values()) {
Map<String, String> tags = generateMetricTags(level, app);
for (java.util.Map.Entry<String, String> entry : metrics.entrySet()) {
Method method = AppInfo.class.getMethod(entry.getValue());
Integer value = Integer.valueOf(method.invoke(app).toString());
String metricName = String.format(entry.getKey(), level.name);
createMetric(appMetricEntities, timestamp, tags, metricName, value);
}
}
}
return appMetricEntities;
}
public List<YarnAppAPIEntity> parseAcceptedApp() {
List<YarnAppAPIEntity> acceptedApps = new ArrayList<>();
try {
List<AppInfo> apps = fetcher.getResource(Constants.ResourceType.ACCEPTED_JOB,
config.getEndpointConfig().limitPerRequest);
if (apps != null) {
LOG.info("successfully fetch {} accepted jobs from {}", apps.size(), fetcher.getSelector().getSelectedUrl());
for (AppInfo app : apps) {
Map<String, String> tags = new HashMap<>();
tags.put(AppStreamInfo.SITE, config.getConfig().getString("siteId"));
tags.put(AppStreamInfo.ID, app.getId());
tags.put(AppStreamInfo.QUEUE, app.getQueue());
tags.put(AppStreamInfo.USER, app.getUser());
YarnAppAPIEntity appAPIEntity = new YarnAppAPIEntity();
appAPIEntity.setTags(tags);
appAPIEntity.setTrackingUrl(buildAcceptedAppTrackingURL(app.getId()));
appAPIEntity.setAppName(app.getName());
appAPIEntity.setClusterUsagePercentage(app.getClusterUsagePercentage());
appAPIEntity.setQueueUsagePercentage(app.getQueueUsagePercentage());
appAPIEntity.setElapsedTime(app.getElapsedTime());
appAPIEntity.setStartedTime(app.getStartedTime());
appAPIEntity.setState(app.getState());
appAPIEntity.setTimestamp(app.getStartedTime());
acceptedApps.add(appAPIEntity);
collector.emit(new Values("", convertAppToStream(appAPIEntity)));
}
}
} catch (Exception e) {
LOG.error("fetch accepted apps failed {}", e.getMessage(), e);
}
return acceptedApps;
}
private String buildAcceptedAppTrackingURL(String appId) {
String url = URLUtil.removeTrailingSlash(fetcher.getSelector().getSelectedUrl());
return String.format("%s/%s/%s?%s", url, Constants.V2_APPS_URL, appId, Constants.ANONYMOUS_PARAMETER);
}
private void flush(Map<String, GenericMetricEntity> appMetrics, List<YarnAppAPIEntity> acceptedApps) {
List<TaggedLogAPIEntity> entities = new ArrayList<>();
if (appMetrics != null && !appMetrics.isEmpty()) {
LOG.info("crawled {} running app metrics", appMetrics.size());
entities.addAll(appMetrics.values());
}
if (acceptedApps != null && !acceptedApps.isEmpty()) {
LOG.info("crawled {} accepted apps", acceptedApps.size());
//entities.addAll(acceptedApps);
}
try {
client.create(entities);
LOG.info("Successfully create {} metrics", entities.size());
} catch (Exception e) {
LOG.error("Fail to create {} metrics due to {}", entities.size(), e.getMessage(), e);
}
}
private enum AggLevel {
CLUSTER("cluster"), QUEUE("queue"), USER("user");
private String name;
AggLevel(String name) {
this.name = name;
}
}
@Override
public void cleanup() {
if (client != null) {
try {
client.close();
} catch (IOException e) {
LOG.error(e.getMessage());
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("f1", "message"));
}
}