blob: 3d998e9fd582b58fd9bfd3728b3fe8d1c0c6424b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.ui.model.pages;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.common.metric.window.StatBuckets;
import com.alibaba.jstorm.ui.NimbusClientManager;
import com.alibaba.jstorm.ui.UIDef;
import com.alibaba.jstorm.ui.UIUtils;
import com.alibaba.jstorm.ui.model.ColumnData;
import com.alibaba.jstorm.ui.model.LinkData;
import com.alibaba.jstorm.ui.model.PageGenerator;
import com.alibaba.jstorm.ui.model.PageIndex;
import com.alibaba.jstorm.ui.model.TableData;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.NetWorkUtils;
import backtype.storm.generated.ComponentSummary;
import backtype.storm.generated.ErrorInfo;
import backtype.storm.generated.MetricInfo;
import backtype.storm.generated.MetricWindow;
import backtype.storm.generated.TaskSummary;
import backtype.storm.generated.TopologyInfo;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
public class ComponentPage implements PageGenerator {
private static final Logger LOG = LoggerFactory
.getLogger(ComponentPage.class);
public String getWindowStr(Integer window) {
return "(" + StatBuckets.prettyUptimeStr(window) + ")";
}
public TableData getComponentSummary(TopologyInfo topologyInfo,
ComponentSummary componentSummary, Map<String, String> paramMap,
Integer window) {
TableData table = new TableData();
List<String> headers = table.getHeaders();
List<Map<String, ColumnData>> lines = table.getLines();
table.setName(componentSummary.get_name() + getWindowStr(window));
headers.add(UIDef.HEADER_COMPONENT_NAME);
headers.add(UIDef.HEADER_TOPOLOGY_NAME);
headers.add(UIDef.HEADER_TASK_NUM);
headers.add(UIDef.HEADER_COMPONENT_TYPE);
Map<String, ColumnData> line = new HashMap<String, ColumnData>();
lines.add(line);
ColumnData nameColumn = new ColumnData();
nameColumn.addText(componentSummary.get_name());
line.put(UIDef.HEADER_COMPONENT_NAME, nameColumn);
ColumnData topologyColumn = new ColumnData();
LinkData linkData = new LinkData();
topologyColumn.addLinkData(linkData);
line.put(UIDef.HEADER_TOPOLOGY_NAME, topologyColumn);
linkData.setUrl(UIDef.LINK_WINDOW_TABLE);
linkData.setText(topologyInfo.get_topology().get_name());
linkData.addParam(UIDef.CLUSTER, paramMap.get(UIDef.CLUSTER));
linkData.addParam(UIDef.PAGE_TYPE, UIDef.PAGE_TYPE_TOPOLOGY);
linkData.addParam(UIDef.TOPOLOGY, topologyInfo.get_topology().get_id());
linkData.addParam(UIDef.WINDOW, StatBuckets.prettyUptimeStr(window));
ColumnData taskNumColumn = new ColumnData();
taskNumColumn.addText(String.valueOf(componentSummary.get_parallel()));
line.put(UIDef.HEADER_TASK_NUM, taskNumColumn);
ColumnData typeColumn = new ColumnData();
typeColumn.addText(componentSummary.get_type());
line.put(UIDef.HEADER_COMPONENT_TYPE, typeColumn);
return table;
}
public TableData getComponentMetrics(TopologyInfo topologyInfo,
ComponentSummary componentSummary, Map<String, String> paramMap,
Integer window) {
MetricInfo metricInfo =
topologyInfo.get_metrics().get_componentMetric()
.get(componentSummary.get_name());
if (metricInfo == null) {
LOG.info("No component metric of " + componentSummary.get_name());
return null;
}
TableData table = UIUtils.getMetricTable(metricInfo, window);
table.setName(componentSummary.get_name() + "-Metrics-"
+ getWindowStr(window));
return table;
}
public TableData getStreamMetrics(
Map<String, Map<String, MetricWindow>> streamMetric, Integer window) {
TableData table = new TableData();
List<String> headers = table.getHeaders();
List<Map<String, ColumnData>> lines = table.getLines();
headers.add(UIDef.HEADER_STREAM_ID);
headers.addAll(UIUtils.getSortedKeys(streamMetric.keySet()));
Map<String, Map<String, ColumnData>> lineMap =
new HashMap<String, Map<String, ColumnData>>();
for (Entry<String, Map<String, MetricWindow>> entry : streamMetric
.entrySet()) {
String metricName = entry.getKey();
Map<String, MetricWindow> streamMetricWindow = entry.getValue();
for (Entry<String, MetricWindow> streamEntry : streamMetricWindow
.entrySet()) {
String streamId = streamEntry.getKey();
MetricWindow metric = streamEntry.getValue();
Map<String, ColumnData> line = lineMap.get(streamId);
if (line == null) {
line = new HashMap<String, ColumnData>();
lineMap.put(streamId, line);
}
String value =
String.valueOf(metric.get_metricWindow().get(window));
ColumnData columnData = new ColumnData();
columnData.addText(value);
line.put(metricName, columnData);
}
}
for (Entry<String, Map<String, ColumnData>> entry : lineMap.entrySet()) {
String streamId = entry.getKey();
Map<String, ColumnData> line = entry.getValue();
lines.add(line);
ColumnData columnData = new ColumnData();
columnData.addText(streamId);
line.put(UIDef.HEADER_STREAM_ID, columnData);
}
UIUtils.complementingTable(table);
return table;
}
public TableData getInputComponentMetrics(TopologyInfo topologyInfo,
ComponentSummary componentSummary, Map<String, String> paramMap,
Integer window) {
String name = componentSummary.get_name();
MetricInfo metric =
topologyInfo.get_metrics().get_componentMetric().get(name);
if (metric == null) {
LOG.info("No component metric of " + name);
return null;
}
Map<String, Map<String, MetricWindow>> input = metric.get_inputMetric();
if (input == null || input.size() == 0) {
LOG.info("No input metric of " + name);
return null;
}
TableData table = getStreamMetrics(input, window);
table.setName("Input stats" + getWindowStr(window));
return table;
}
public TableData getOutputComponentMetrics(TopologyInfo topologyInfo,
ComponentSummary componentSummary, Map<String, String> paramMap,
Integer window) {
String name = componentSummary.get_name();
MetricInfo metric =
topologyInfo.get_metrics().get_componentMetric().get(name);
if (metric == null) {
LOG.info("No component metric of " + componentSummary.get_name());
return null;
}
Map<String, Map<String, MetricWindow>> output =
metric.get_outputMetric();
if (output == null || output.size() == 0) {
LOG.info("No output metric of " + name);
return null;
}
TableData table = getStreamMetrics(output, window);
table.setName("Output stats" + getWindowStr(window));
return table;
}
public TableData getTaskSummary(ComponentSummary componentSummary,
List<TaskSummary> tasks, Map<String, String> paramMap,
String topologyId, Map<String, Object> nimbusConf,
List<TaskSummary> showTasks) {
TableData table = new TableData();
List<String> headers = table.getHeaders();
List<Map<String, ColumnData>> lines = table.getLines();
table.setName("Tasks");
headers.add(UIDef.HEADER_TASK_ID);
headers.add(UIDef.HEADER_STATUS);
headers.add(UIDef.HEADER_UPTIME);
headers.add(StringUtils.capitalize(UIDef.HOST));
headers.add(StringUtils.capitalize(UIDef.PORT));
headers.add(UIDef.HEADER_LOG);
headers.add(UIDef.HEADER_ERROR);
List<Integer> taskIds = componentSummary.get_task_ids();
Set<Integer> taskIdSet = new HashSet<Integer>();
taskIdSet.addAll(taskIds);
long pos = JStormUtils.parseLong(paramMap.get(UIDef.POS), 0);
long index = 0;
Collections.sort(tasks);
for (TaskSummary task : tasks) {
Integer taskId = task.get_task_id();
if (taskIdSet.contains(taskId) == false) {
continue;
}
if (index < pos) {
index ++;
continue;
}else if (pos <= index && index < pos + UIUtils.ONE_TABLE_PAGE_SIZE) {
showTasks.add(task);
taskIdSet.remove(taskId);
index++;
}else {
break;
}
}
int logPort =
ConfigExtension.getSupervisorDeamonHttpserverPort(nimbusConf);
for (TaskSummary task : showTasks) {
Map<String, ColumnData> line = new HashMap<String, ColumnData>();
lines.add(line);
ColumnData taskIdColumn = new ColumnData();
taskIdColumn.addText(String.valueOf(task.get_task_id()));
line.put(UIDef.HEADER_TASK_ID, taskIdColumn);
ColumnData statusColumn = new ColumnData();
statusColumn.addText(task.get_status());
line.put(UIDef.HEADER_STATUS, statusColumn);
ColumnData uptimeColumn = new ColumnData();
int uptime = task.get_uptime();
uptimeColumn.addText(StatBuckets.prettyUptimeStr(uptime));
line.put(UIDef.HEADER_UPTIME, uptimeColumn);
ColumnData hostColumn = new ColumnData();
LinkData linkData = new LinkData();
hostColumn.addLinkData(linkData);
line.put(StringUtils.capitalize(UIDef.HOST), hostColumn);
linkData.setUrl(UIDef.LINK_TABLE_PAGE);
linkData.setText(NetWorkUtils.ip2Host(task.get_host()));
linkData.addParam(UIDef.CLUSTER, paramMap.get(UIDef.CLUSTER));
linkData.addParam(UIDef.PAGE_TYPE, UIDef.PAGE_TYPE_SUPERVISOR);
linkData.addParam(UIDef.HOST, task.get_host());
ColumnData portColumn = new ColumnData();
portColumn.addText(String.valueOf(task.get_port()));
line.put(StringUtils.capitalize(UIDef.PORT), portColumn);
ColumnData logColumn = new ColumnData();
LinkData logLink = new LinkData();
logColumn.addLinkData(logLink);
line.put(UIDef.HEADER_LOG, logColumn);
logLink.setUrl(UIDef.LINK_LOG);
logLink.setText(UIDef.HEADER_LOG.toLowerCase());
logLink.addParam(UIDef.CLUSTER, paramMap.get(UIDef.CLUSTER));
logLink.addParam(UIDef.PAGE_TYPE, UIDef.PAGE_TYPE_LOG);
logLink.addParam(UIDef.HOST, task.get_host());
logLink.addParam(UIDef.TOPOLOGY, topologyId);
logLink.addParam(UIDef.PORT, String.valueOf(task.get_port()));
logLink.addParam(UIDef.LOG_SERVER_PORT, String.valueOf(logPort));
ColumnData errColumn = new ColumnData();
List<ErrorInfo> errList = task.get_errors();
if (errList == null || errList.size() == 0) {
errColumn.addText("");
} else {
for (ErrorInfo err : errList) {
errColumn.addText(err.get_error() + "\r\n");
}
}
line.put(UIDef.HEADER_ERROR, errColumn);
}
return table;
}
public TableData getTaskMetrics(TopologyInfo topologyInfo, List<TaskSummary> showTasks,
ComponentSummary componentSummary, Map<String, String> paramMap,
Integer window) {
TableData table = new TableData();
List<String> headers = table.getHeaders();
List<Map<String, ColumnData>> lines = table.getLines();
Map<Integer, MetricInfo> allTaskMetrics =
topologyInfo.get_metrics().get_taskMetric();
Map<Integer, MetricInfo> taskMetrics =
new TreeMap<Integer, MetricInfo>();
for ( TaskSummary taskSummary : showTasks) {
Integer taskId = taskSummary.get_task_id();
MetricInfo metric = allTaskMetrics.get(taskId);
if (metric == null) {
LOG.error("No task metric of " + taskId);
continue;
}
taskMetrics.put(taskId, metric);
}
headers.add(UIDef.HEADER_TASK_ID);
List<String> keys =
UIUtils.getSortedKeys(UIUtils.getKeys(taskMetrics.values()));
headers.addAll(keys);
for (Entry<Integer, MetricInfo> entry : taskMetrics.entrySet()) {
Integer taskId = entry.getKey();
MetricInfo metric = entry.getValue();
Map<String, ColumnData> line =
UIUtils.getMetricLine(metric, headers, window);
ColumnData taskIdColumn = new ColumnData();
taskIdColumn.addText(String.valueOf(taskId));
line.put(UIDef.HEADER_TASK_ID, taskIdColumn);
lines.add(line);
}
return table;
}
public Output generate(Map<String, String> paramMap) {
List<TableData> tables = new ArrayList<TableData>();
NimbusClient client = null;
try {
client = NimbusClientManager.getNimbusClient(paramMap);
String jsonConf = client.getClient().getNimbusConf();
Map<String, Object> nimbusConf =
(Map<String, Object>) Utils.from_json(jsonConf);
String topologyId = paramMap.get(UIDef.TOPOLOGY);
if (topologyId == null) {
throw new IllegalArgumentException("Not set topologyId");
}
String componentNam = paramMap.get(UIDef.COMPONENT);
if (componentNam == null) {
throw new IllegalArgumentException("Not set ComponentNam");
}
String windowStr = paramMap.get(UIDef.WINDOW);
Integer window = StatBuckets.getTimeKey(windowStr);
TopologyInfo topologyInfo =
client.getClient().getTopologyInfo(topologyId);
ComponentSummary componentSummary = null;
List<ComponentSummary> componentSummaries =
topologyInfo.get_components();
for (ComponentSummary item : componentSummaries) {
if (item.get_name().equals(componentNam)) {
componentSummary = item;
break;
}
}
if (componentSummary == null) {
throw new IllegalArgumentException("No Component of "
+ componentNam);
}
List<ComponentSummary> myComponentSummaryList =
new ArrayList<ComponentSummary>();
myComponentSummaryList.add(componentSummary);
TableData componentTable =
UIUtils.getComponentTable(topologyInfo,
myComponentSummaryList, topologyInfo.get_metrics()
.get_componentMetric(), paramMap, window);
tables.add(componentTable);
TableData inputTable =
getInputComponentMetrics(topologyInfo, componentSummary,
paramMap, window);
if (inputTable != null) {
tables.add(inputTable);
}
TableData outputTable =
getOutputComponentMetrics(topologyInfo, componentSummary,
paramMap, window);
if (outputTable != null) {
tables.add(outputTable);
}
List<TaskSummary> showTasks = new ArrayList<TaskSummary>();
TableData taskSummaryTable =
getTaskSummary(componentSummary, topologyInfo.get_tasks(),
paramMap, topologyId, nimbusConf, showTasks);
tables.add(taskSummaryTable);
TableData taskMetric =
getTaskMetrics(topologyInfo, showTasks, componentSummary, paramMap,
window);
tables.add(taskMetric);
Output ret = new Output();
ret.tables = tables;
ret.rawData = "";
if (componentSummary.get_task_ids().size() > UIUtils.ONE_TABLE_PAGE_SIZE) {
ret.pages = PageIndex.generatePageIndex(
componentSummary.get_task_ids().size(),
UIUtils.ONE_TABLE_PAGE_SIZE,
UIDef.LINK_WINDOW_TABLE, paramMap);
}
return ret;
} catch (Exception e) {
NimbusClientManager.removeClient(paramMap);
LOG.error(e.getMessage(), e);
return UIUtils.getErrorInfo(e);
}
}
}