blob: 3109bfee7f9af1a087613de5285d7aebae80aa5c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF)
* under one or more contributor license agreements.
* See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.daemon.ui;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.servlet.DispatcherType;
import javax.servlet.Servlet;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.DaemonConfig;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.BoltAggregateStats;
import org.apache.storm.generated.ClusterSummary;
import org.apache.storm.generated.CommonAggregateStats;
import org.apache.storm.generated.ComponentAggregateStats;
import org.apache.storm.generated.ComponentPageInfo;
import org.apache.storm.generated.ComponentType;
import org.apache.storm.generated.ErrorInfo;
import org.apache.storm.generated.ExecutorAggregateStats;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.ExecutorSummary;
import org.apache.storm.generated.GetInfoOptions;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.KillOptions;
import org.apache.storm.generated.LogConfig;
import org.apache.storm.generated.LogLevel;
import org.apache.storm.generated.LogLevelAction;
import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.NimbusSummary;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.generated.NumErrorsChoice;
import org.apache.storm.generated.OwnerResourceSummary;
import org.apache.storm.generated.ProfileAction;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.RebalanceOptions;
import org.apache.storm.generated.SpecificAggregateStats;
import org.apache.storm.generated.SpoutAggregateStats;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.SupervisorPageInfo;
import org.apache.storm.generated.SupervisorSummary;
import org.apache.storm.generated.TopologyHistoryInfo;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.generated.TopologyPageInfo;
import org.apache.storm.generated.TopologyStats;
import org.apache.storm.generated.TopologySummary;
import org.apache.storm.generated.WorkerSummary;
import org.apache.storm.logging.filters.AccessLoggingFilter;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
import org.apache.storm.stats.StatsUtil;
import org.apache.storm.thrift.TException;
import org.apache.storm.utils.IVersionInfo;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.TopologySpoutLag;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.VersionInfo;
import org.apache.storm.utils.WebAppUtils;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.CrossOriginFilter;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class UIHelpers {
private static final Logger LOG = LoggerFactory.getLogger(UIHelpers.class);
private static final Object[][] PRETTY_SEC_DIVIDERS = {
new Object[]{ "s", 60 },
new Object[]{ "m", 60 },
new Object[]{ "h", 24 },
new Object[]{ "d", null }
};
private static final Object[][] PRETTY_MS_DIVIDERS = {
new Object[]{ "ms", 1000 },
new Object[]{ "s", 60 },
new Object[]{ "m", 60 },
new Object[]{ "h", 24 },
new Object[]{ "d", null }
};
/**
* Prettify uptime string.
* @param val val.
* @param dividers dividers.
* @return prettified uptime string.
*/
public static String prettyUptimeStr(String val, Object[][] dividers) {
int uptime = Integer.parseInt(val);
LinkedList<String> tmp = new LinkedList<>();
for (Object[] divider : dividers) {
if (uptime > 0) {
String state = (String) divider[0];
Integer div = (Integer) divider[1];
if (div != null) {
tmp.addFirst(uptime % div + state);
uptime = uptime / div;
} else {
tmp.addFirst(uptime + state);
}
}
}
return Joiner.on(" ").join(tmp);
}
/**
* Prettify uptime string.
* @param sec uptime in seconds.
* @return prettified uptime string.
*/
public static String prettyUptimeSec(String sec) {
return prettyUptimeStr(sec, PRETTY_SEC_DIVIDERS);
}
/**
* prettyUptimeSec.
* @param secs secs
* @return prettyUptimeSec
*/
public static String prettyUptimeSec(int secs) {
return prettyUptimeStr(String.valueOf(secs), PRETTY_SEC_DIVIDERS);
}
/**
* prettyUptimeMs.
* @param ms ms
* @return prettyUptimeMs
*/
public static String prettyUptimeMs(String ms) {
return prettyUptimeStr(ms, PRETTY_MS_DIVIDERS);
}
/**
* prettyUptimeMs.
* @param ms ms
* @return prettyUptimeMs
*/
public static String prettyUptimeMs(int ms) {
return prettyUptimeStr(String.valueOf(ms), PRETTY_MS_DIVIDERS);
}
/**
* url formatter for log links.
* @param fmt string format
* @param args hostname and other arguments.
* @return string formatter
*/
public static String urlFormat(String fmt, Object... args) {
String[] argsEncoded = new String[args.length];
for (int i = 0; i < args.length; i++) {
argsEncoded[i] = Utils.urlEncodeUtf8(String.valueOf(args[i]));
}
return String.format(fmt, argsEncoded);
}
/**
* Prettified executor info.
* @param e from Nimbus call
* @return prettified executor info string
*/
public static String prettyExecutorInfo(ExecutorInfo e) {
return "[" + e.get_task_start() + "-" + e.get_task_end() + "]";
}
/**
* Unauthorized user json.
* @param user User id.
* @return Unauthorized user json.
*/
public static Map<String, Object> unauthorizedUserJson(String user) {
return ImmutableMap.of(
"error", "No Authorization",
"errorMessage", String.format("User %s is not authorized.", user));
}
private static ServerConnector mkSslConnector(Server server, Integer port, String ksPath,
String ksPassword, String ksType,
String keyPassword, String tsPath,
String tsPassword, String tsType,
Boolean needClientAuth, Boolean wantClientAuth,
Integer headerBufferSize) {
SslContextFactory factory = new SslContextFactory();
factory.setExcludeCipherSuites("SSL_RSA_WITH_RC4_128_MD5", "SSL_RSA_WITH_RC4_128_SHA");
factory.setExcludeProtocols("SSLv3");
factory.setRenegotiationAllowed(false);
factory.setKeyStorePath(ksPath);
factory.setKeyStoreType(ksType);
factory.setKeyStorePassword(ksPassword);
factory.setKeyManagerPassword(keyPassword);
if (tsPath != null && tsPassword != null && tsType != null) {
factory.setTrustStorePath(tsPath);
factory.setTrustStoreType(tsType);
factory.setTrustStorePassword(tsPassword);
}
if (needClientAuth != null && needClientAuth) {
factory.setNeedClientAuth(true);
} else if (wantClientAuth != null && wantClientAuth) {
factory.setWantClientAuth(true);
}
HttpConfiguration httpsConfig = new HttpConfiguration();
httpsConfig.addCustomizer(new SecureRequestCustomizer());
if (null != headerBufferSize) {
httpsConfig.setRequestHeaderSize(headerBufferSize);
}
ServerConnector sslConnector = new ServerConnector(
server,
new SslConnectionFactory(factory, HttpVersion.HTTP_1_1.asString()),
new HttpConnectionFactory(httpsConfig)
);
sslConnector.setPort(port);
return sslConnector;
}
public static void configSsl(Server server, Integer port, String ksPath,
String ksPassword, String ksType,
String keyPassword, String tsPath,
String tsPassword, String tsType,
Boolean needClientAuth, Boolean wantClientAuth) {
configSsl(server, port, ksPath, ksPassword, ksType, keyPassword,
tsPath, tsPassword, tsType, needClientAuth, wantClientAuth, null);
}
/**
* configSsl.
* @param server server
* @param port port
* @param ksPath ksPath
* @param ksPassword ksPassword
* @param ksType ksType
* @param keyPassword keyPassword
* @param tsPath tsPath
* @param tsPassword tsPassword
* @param tsType tsType
* @param needClientAuth needClientAuth
* @param wantClientAuth wantClientAuth
* @param headerBufferSize headerBufferSize
*/
public static void configSsl(Server server, Integer port, String ksPath,
String ksPassword, String ksType,
String keyPassword, String tsPath,
String tsPassword, String tsType,
Boolean needClientAuth,
Boolean wantClientAuth, Integer headerBufferSize) {
if (port > 0) {
server.addConnector(
mkSslConnector(
server, port, ksPath, ksPassword, ksType, keyPassword,
tsPath, tsPassword, tsType,
needClientAuth, wantClientAuth, headerBufferSize
)
);
}
}
/**
* corsFilterHandle.
* @return corsFilterHandle
*/
public static FilterHolder corsFilterHandle() {
FilterHolder filterHolder = new FilterHolder(new CrossOriginFilter());
filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_ORIGINS_PARAM, "*");
filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_ORIGINS_PARAM, "GET, POST, PUT");
filterHolder.setInitParameter(
CrossOriginFilter.ALLOWED_ORIGINS_PARAM,
"X-Requested-With, X-Requested-By, Access-Control-Allow-Origin,"
+ " Content-Type, Content-Length, Accept, Origin");
filterHolder.setInitParameter(CrossOriginFilter.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*");
return filterHolder;
}
/**
* mkAccessLoggingFilterHandle.
* @return mkAccessLoggingFilterHandle
*/
public static FilterHolder mkAccessLoggingFilterHandle() {
return new FilterHolder(new AccessLoggingFilter());
}
public static void configFilter(Server server, Servlet servlet, List<FilterConfiguration> filtersConfs) {
configFilter(server, servlet, filtersConfs, null);
}
/**
* Config filter.
* @param server Server
* @param servlet Servlet
* @param filtersConfs FiltersConfs
* @param params Filter params
*/
public static void configFilter(Server server, Servlet servlet,
List<FilterConfiguration> filtersConfs,
Map<String, String> params) {
if (filtersConfs != null) {
ServletHolder servletHolder = new ServletHolder(servlet);
servletHolder.setInitOrder(0);
if (params != null) {
servletHolder.setInitParameters(params);
}
ServletContextHandler context = new ServletContextHandler(server, "/");
context.addServlet(servletHolder, "/");
configFilters(context, filtersConfs);
server.setHandler(context);
}
}
/**
* Config filters.
* @param context Servlet context
* @param filtersConfs filter confs
*/
public static void configFilters(ServletContextHandler context,
List<FilterConfiguration> filtersConfs) {
context.addFilter(corsFilterHandle(), "/*", EnumSet.allOf(DispatcherType.class));
for (FilterConfiguration filterConf : filtersConfs) {
String filterName = filterConf.getFilterName();
String filterClass = filterConf.getFilterClass();
Map<String, String> filterParams = filterConf.getFilterParams();
if (filterClass != null) {
FilterHolder filterHolder = new FilterHolder();
filterHolder.setClassName(filterClass);
if (filterName != null) {
filterHolder.setName(filterName);
} else {
filterHolder.setName(filterClass);
}
if (filterParams != null) {
filterHolder.setInitParameters(filterParams);
} else {
filterHolder.setInitParameters(new HashMap<>());
}
context.addFilter(filterHolder, "/*", EnumSet.allOf(DispatcherType.class));
}
}
context.addFilter(mkAccessLoggingFilterHandle(), "/*", EnumSet.allOf(DispatcherType.class));
}
/**
* Construct a Jetty Server instance.
*/
public static Server jettyCreateServer(Integer port, String host, Integer httpsPort, Boolean disableHttpBinding) {
return jettyCreateServer(port, host, httpsPort, null, disableHttpBinding);
}
/**
* Construct a Jetty Server instance.
*/
public static Server jettyCreateServer(Integer port, String host,
Integer httpsPort, Integer headerBufferSize, Boolean disableHttpBinding) {
Server server = new Server();
if (httpsPort == null || httpsPort <= 0 || disableHttpBinding == null || disableHttpBinding == false) {
HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.setSendDateHeader(true);
if (null != headerBufferSize) {
httpConfig.setRequestHeaderSize(headerBufferSize);
}
ServerConnector httpConnector = new ServerConnector(
server, new HttpConnectionFactory(httpConfig)
);
httpConnector.setPort(ObjectReader.getInt(port, 80));
httpConnector.setIdleTimeout(200000);
httpConnector.setHost(host);
server.addConnector(httpConnector);
}
return server;
}
/**
* Modified version of run-jetty
* Assumes configurator sets handler.
*/
public static void stormRunJetty(Integer port, String host,
Integer httpsPort, Integer headerBufferSize,
IConfigurator configurator) throws Exception {
Server s = jettyCreateServer(port, host, httpsPort, headerBufferSize, false);
if (configurator != null) {
configurator.execute(s);
}
s.start();
}
public static void stormRunJetty(Integer port, Integer headerBufferSize,
IConfigurator configurator) throws Exception {
stormRunJetty(port, null, null, headerBufferSize, configurator);
}
/**
* wrapJsonInCallback.
* @param callback callbackParameterName
* @param response response
* @return wrapJsonInCallback
*/
public static String wrapJsonInCallback(String callback, String response) {
return callback + "(" + response + ");";
}
/**
* getJsonResponseHeaders.
* @param callback callbackParameterName
* @param headers headers
* @return getJsonResponseHeaders
*/
public static Map getJsonResponseHeaders(String callback, Map headers) {
Map<String, String> headersResult = new HashMap<>();
headersResult.put("Cache-Control", "no-cache, no-store");
headersResult.put("Access-Control-Allow-Origin", "*");
headersResult.put("Access-Control-Allow-Headers",
"Content-Type, Access-Control-Allow-Headers, "
+ "Access-Controler-Allow-Origin, "
+ "X-Requested-By, X-Csrf-Token, "
+ "Authorization, X-Requested-With");
if (callback != null) {
headersResult.put("Content-Type", "application/javascript;charset=utf-8");
} else {
headersResult.put("Content-Type", "application/json;charset=utf-8");
}
if (headers != null) {
headersResult.putAll(headers);
}
return headersResult;
}
public static String getJsonResponseBody(Object data, String callback, boolean needSerialize) {
String serializedData = needSerialize ? JSONValue.toJSONString(data) : (String) data;
return callback != null ? wrapJsonInCallback(callback, serializedData) : serializedData;
}
/**
* Converts exception into json map.
* @param ex Exception to be converted.
* @param statusCode Status code to be returned.
* @return Map to be converted into json.
*/
public static Map exceptionToJson(Exception ex, int statusCode) {
StringWriter sw = new StringWriter();
ex.printStackTrace(new PrintWriter(sw));
return ImmutableMap.of(
"error", statusCode
+ " "
+ HttpStatus.getMessage(statusCode),
"errorMessage", sw.toString());
}
public static Response makeStandardResponse(Object data, String callback) {
return makeStandardResponse(data, callback, true, Response.Status.OK);
}
public static Response makeStandardResponse(Object data, String callback, Response.Status status) {
return makeStandardResponse(data, callback, true, status);
}
/**
* makeStandardResponse.
* @param data data
* @param callback callbackParameterName
* @param needsSerialization needsSerialization
* @return makeStandardResponse
*/
public static Response makeStandardResponse(
Object data, String callback, boolean needsSerialization, Response.Status status) {
String body = getJsonResponseBody(data, callback, needsSerialization);
Response.ResponseBuilder responseBuilder = Response.status(status).entity(body);
Map<String, String> headers = getJsonResponseHeaders(callback, null);
for (Map.Entry<String, String> headerEntry: headers.entrySet()) {
responseBuilder.header(headerEntry.getKey(), headerEntry.getValue());
}
return responseBuilder.build();
}
private static final AtomicReference<List<Map<String, String>>> MEMORIZED_VERSIONS = new AtomicReference<>();
private static final AtomicReference<Map<String, String>> MEMORIZED_FULL_VERSION = new AtomicReference<>();
private static Map<String, String> toJsonStruct(IVersionInfo info) {
Map<String, String> ret = new HashMap<>();
ret.put("version", info.getVersion());
ret.put("revision", info.getRevision());
ret.put("branch", info.getBranch());
ret.put("date", info.getDate());
ret.put("user", info.getUser());
ret.put("url", info.getUrl());
ret.put("srcChecksum", info.getSrcChecksum());
return ret;
}
/**
* Converts thrift call result into map fit for UI/api.
* @param clusterSummary Obtained from Nimbus.
* @param user User Making request
* @param conf Storm Conf
* @return Cluster Summary for display on UI/monitoring purposes via API
*/
public static Map<String, Object> getClusterSummary(ClusterSummary clusterSummary, String user,
Map<String, Object> conf) {
Map<String, Object> result = new HashMap();
if (MEMORIZED_VERSIONS.get() == null) {
//Races are okay this is just to avoid extra work for each page load.
NavigableMap<String, IVersionInfo> versionsMap = Utils.getAlternativeVersionsMap(conf);
List<Map<String, String>> versionList = new ArrayList<>();
for (Map.Entry<String, IVersionInfo> entry : versionsMap.entrySet()) {
Map<String, String> single = new HashMap<>(toJsonStruct(entry.getValue()));
single.put("versionMatch", entry.getKey());
versionList.add(single);
}
MEMORIZED_VERSIONS.set(versionList);
}
List<Map<String, String>> versions = MEMORIZED_VERSIONS.get();
if (!versions.isEmpty()) {
result.put("alternativeWorkerVersions", versions);
}
if (MEMORIZED_FULL_VERSION.get() == null) {
MEMORIZED_FULL_VERSION.set(toJsonStruct(VersionInfo.OUR_FULL_VERSION));
}
result.put("user", user);
result.put("stormVersion", VersionInfo.getVersion());
result.put("stormVersionInfo", MEMORIZED_FULL_VERSION.get());
List<SupervisorSummary> supervisorSummaries = clusterSummary.get_supervisors();
result.put("supervisors", supervisorSummaries.size());
result.put("topologies", clusterSummary.get_topologies_size());
int usedSlots =
supervisorSummaries.stream().mapToInt(
SupervisorSummary::get_num_used_workers).sum();
result.put("slotsUsed", usedSlots);
int totalSlots =
supervisorSummaries.stream().mapToInt(
SupervisorSummary::get_num_workers).sum();
result.put("slotsTotal", totalSlots);
result.put("slotsFree", totalSlots - usedSlots);
List<TopologySummary> topologySummaries = clusterSummary.get_topologies();
int totalTasks =
topologySummaries.stream().mapToInt(
TopologySummary::get_num_tasks).sum();
result.put("tasksTotal", totalTasks);
int totalExecutors =
topologySummaries.stream().mapToInt(
TopologySummary::get_num_executors).sum();
result.put("executorsTotal", totalExecutors);
double supervisorTotalMemory =
supervisorSummaries.stream().mapToDouble(x -> x.get_total_resources().getOrDefault(
Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME,
x.get_total_resources().get(Config.SUPERVISOR_MEMORY_CAPACITY_MB)
)
).sum();
result.put("totalMem", supervisorTotalMemory);
double supervisorTotalCpu =
supervisorSummaries.stream().mapToDouble(x -> x.get_total_resources().getOrDefault(
Constants.COMMON_CPU_RESOURCE_NAME,
x.get_total_resources().get(Config.SUPERVISOR_CPU_CAPACITY)
)
).sum();
result.put("totalCpu", supervisorTotalCpu);
double supervisorUsedMemory =
supervisorSummaries.stream().mapToDouble(SupervisorSummary::get_used_mem).sum();
result.put("availMem", supervisorTotalMemory - supervisorUsedMemory);
double supervisorUsedCpu =
supervisorSummaries.stream().mapToDouble(SupervisorSummary::get_used_cpu).sum();
result.put("availCpu", supervisorTotalCpu - supervisorUsedCpu);
result.put("fragmentedMem", supervisorSummaries.stream().mapToDouble(SupervisorSummary::get_fragmented_mem).sum());
result.put("fragmentedCpu", supervisorSummaries.stream().mapToDouble(SupervisorSummary::get_fragmented_cpu).sum());
result.put("schedulerDisplayResource",
conf.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE));
result.put("memAssignedPercentUtil", supervisorTotalMemory > 0
? StatsUtil.floatStr((supervisorUsedMemory * 100.0) / supervisorTotalMemory) : "0.0");
result.put("cpuAssignedPercentUtil", supervisorTotalCpu > 0
? StatsUtil.floatStr((supervisorUsedCpu * 100.0) / supervisorTotalCpu) : "0.0");
result.put("bugtracker-url", conf.get(DaemonConfig.UI_PROJECT_BUGTRACKER_URL));
result.put("central-log-url", conf.get(DaemonConfig.UI_CENTRAL_LOGGING_URL));
Map<String, Double> usedGenericResources = new HashMap<>();
Map<String, Double> totalGenericResources = new HashMap<>();
for (SupervisorSummary ss : supervisorSummaries) {
usedGenericResources = NormalizedResourceRequest.addResourceMap(usedGenericResources, ss.get_used_generic_resources());
totalGenericResources = NormalizedResourceRequest.addResourceMap(totalGenericResources, ss.get_total_resources());
}
Map<String, Double> availGenericResources = NormalizedResourceRequest
.subtractResourceMap(totalGenericResources, usedGenericResources);
result.put("availGenerics", prettifyGenericResources(availGenericResources));
result.put("totalGenerics", prettifyGenericResources(totalGenericResources));
return result;
}
private static String prettifyGenericResources(Map<String, Double> resourceMap) {
if (resourceMap == null) {
return null;
}
TreeMap<String, Double> treeGenericResources = new TreeMap<>(); // use TreeMap for deterministic ordering
treeGenericResources.putAll(resourceMap);
NormalizedResourceRequest.removeNonGenericResources(treeGenericResources);
return treeGenericResources.toString()
.replaceAll("[{}]", "")
.replace(",", "");
}
/**
* Prettify OwnerResourceSummary.
* @param ownerResourceSummary ownerResourceSummary
* @return Map of prettified OwnerResourceSummary.
*/
public static Map<String, Object> unpackOwnerResourceSummary(
OwnerResourceSummary ownerResourceSummary) {
Double memoryGuarantee = Double.valueOf(-1);
if (ownerResourceSummary.is_set_memory_guarantee()) {
memoryGuarantee = ownerResourceSummary.get_memory_guarantee();
}
Double cpuGuaranteee = Double.valueOf(-1);
if (ownerResourceSummary.is_set_cpu_guarantee()) {
cpuGuaranteee = ownerResourceSummary.get_cpu_guarantee();
}
int isolatedNodeGuarantee = -1;
if (ownerResourceSummary.is_set_isolated_node_guarantee()) {
isolatedNodeGuarantee = ownerResourceSummary.get_isolated_node_guarantee();
}
Double memoryGuaranteeRemaining = Double.valueOf(-1);
if (ownerResourceSummary.is_set_memory_guarantee_remaining()) {
memoryGuaranteeRemaining = ownerResourceSummary.get_memory_guarantee_remaining();
}
Double cpuGuaranteeRemaining = Double.valueOf(-1);
if (ownerResourceSummary.is_set_cpu_guarantee_remaining()) {
cpuGuaranteeRemaining = ownerResourceSummary.get_cpu_guarantee_remaining();
}
Map<String, Object> result = new HashMap();
result.put("owner", ownerResourceSummary.get_owner());
result.put("totalTopologies", ownerResourceSummary.get_total_topologies());
result.put("totalExecutors", ownerResourceSummary.get_total_executors());
result.put("totalWorkers", ownerResourceSummary.get_total_workers());
result.put("totalTasks", ownerResourceSummary.get_total_tasks());
result.put("totalMemoryUsage", ownerResourceSummary.get_memory_usage());
result.put("totalCpuUsage", ownerResourceSummary.get_cpu_usage());
result.put("memoryGuarantee", memoryGuarantee != -1 ? memoryGuarantee : "N/A");
result.put("cpuGuarantee", cpuGuaranteee != -1 ? cpuGuaranteee : "N/A");
result.put("isolatedNodes", isolatedNodeGuarantee);
result.put("memoryGuaranteeRemaining",
memoryGuaranteeRemaining != -1 ? memoryGuaranteeRemaining : "N/A");
result.put("cpuGuaranteeRemaining",
cpuGuaranteeRemaining != -1 ? cpuGuaranteeRemaining : "N/A");
result.put("totalReqOnHeapMem", ownerResourceSummary.get_requested_on_heap_memory());
result.put("totalReqOffHeapMem", ownerResourceSummary.get_requested_off_heap_memory());
result.put("totalReqMem", ownerResourceSummary.get_requested_total_memory());
result.put("totalReqCpu", ownerResourceSummary.get_requested_cpu());
result.put("totalAssignedOnHeapMem",
ownerResourceSummary.get_assigned_on_heap_memory()
);
result.put("totalAssignedOffHeapMem", ownerResourceSummary.get_assigned_off_heap_memory());
return result;
}
/**
* Get prettified ownerResourceSummaries.
* @param ownerResourceSummaries ownerResourceSummaries from thrift call
* @param conf Storm conf
* @return map to be converted to json.
*/
public static Map<String, Object> getOwnerResourceSummaries(
List<OwnerResourceSummary> ownerResourceSummaries, Map<String, Object> conf) {
Map<String, Object> result = new HashMap();
result.put("schedulerDisplayResource", conf.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE));
List<Map<String, Object>> ownerSummaries = new ArrayList();
for (OwnerResourceSummary ownerResourceSummary : ownerResourceSummaries) {
ownerSummaries.add(unpackOwnerResourceSummary(ownerResourceSummary));
}
result.put("owners", ownerSummaries);
return result;
}
/**
* getTopologyMap.
* @param topologySummary topologySummary
* @return getTopologyMap
*/
public static Map<String, Object> getTopologyMap(TopologySummary topologySummary) {
Map<String, Object> result = new HashMap();
result.put("id", topologySummary.get_id());
result.put("encodedId", Utils.urlEncodeUtf8(topologySummary.get_id()));
result.put("owner", topologySummary.get_owner());
result.put("name", topologySummary.get_name());
result.put("status", topologySummary.get_status());
result.put("uptime", UIHelpers.prettyUptimeSec(topologySummary.get_uptime_secs()));
result.put("uptimeSeconds", topologySummary.get_uptime_secs());
result.put("tasksTotal", topologySummary.get_num_tasks());
result.put("workersTotal", topologySummary.get_num_workers());
result.put("executorsTotal", topologySummary.get_num_executors());
result.put("replicationCount", topologySummary.get_replication_count());
result.put("schedulerInfo", topologySummary.get_sched_status());
result.put("requestedMemOnHeap", topologySummary.get_requested_memonheap());
result.put("requestedMemOffHeap", topologySummary.get_requested_memoffheap());
result.put("requestedTotalMem",
topologySummary.get_requested_memoffheap()
+ topologySummary.get_assigned_memonheap());
result.put("requestedCpu", topologySummary.get_requested_cpu());
result.put("requestedGenericResources", prettifyGenericResources(topologySummary.get_requested_generic_resources()));
result.put("assignedMemOnHeap", topologySummary.get_assigned_memonheap());
result.put("assignedMemOffHeap", topologySummary.get_assigned_memoffheap());
result.put("assignedTotalMem",
topologySummary.get_assigned_memoffheap()
+ topologySummary.get_assigned_memonheap());
result.put("assignedCpu", topologySummary.get_assigned_cpu());
result.put("assignedGenericResources", prettifyGenericResources(topologySummary.get_assigned_generic_resources()));
result.put("topologyVersion", topologySummary.get_topology_version());
result.put("stormVersion", topologySummary.get_storm_version());
return result;
}
/**
* Get a specific owner resource summary.
* @param ownerResourceSummaries Result from thrift call.
* @param client client
* @param id Owner id.
* @param config Storm conf.
* @return prettified owner resource summary.
*/
public static Map<String, Object> getOwnerResourceSummary(
List<OwnerResourceSummary> ownerResourceSummaries,
Nimbus.Iface client, String id, Map<String, Object> config) throws TException {
Map<String, Object> result = new HashMap();
result.put("schedulerDisplayResource", config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE));
if (ownerResourceSummaries.isEmpty()) {
return unpackOwnerResourceSummary(new OwnerResourceSummary(id));
}
List<TopologySummary> topologies = null;
topologies = client.getClusterInfo().get_topologies();
List<Map> topologySummaries = getTopologiesMap(id, topologies);
result.putAll(unpackOwnerResourceSummary(ownerResourceSummaries.get(0)));
result.put("topologies", topologySummaries);
return result;
}
/**
* getTopologiesMap.
* @param id id
* @param topologies topologies
* @return getTopologiesMap
*/
private static List<Map> getTopologiesMap(String id, List<TopologySummary> topologies) {
List<Map> topologySummaries = new ArrayList();
for (TopologySummary topologySummary : topologies) {
if (id == null || topologySummary.get_owner().equals(id)) {
topologySummaries.add(getTopologyMap(topologySummary));
}
}
return topologySummaries;
}
/**
* getLogviewerLink.
* @param host host
* @param fname fname
* @param config config
* @param port port
* @return getLogviewerLink.
*/
public static String getLogviewerLink(String host, String fname,
Map<String, Object> config, int port) {
if (isSecureLogviewer(config)) {
return UIHelpers.urlFormat("https://%s:%s/api/v1/log?file=%s",
host, config.get(DaemonConfig.LOGVIEWER_HTTPS_PORT), fname);
} else {
return UIHelpers.urlFormat("http://%s:%s/api/v1/log?file=%s",
host, config.get(DaemonConfig.LOGVIEWER_PORT), fname);
}
}
/**
* Get log link to nimbus log.
* @param host nimbus host name
* @param config storm config
* @return log link.
*/
public static String getNimbusLogLink(String host, Map<String, Object> config) {
if (isSecureLogviewer(config)) {
return UIHelpers.urlFormat("https://%s:%s/api/v1/daemonlog?file=nimbus.log",
host, config.get(DaemonConfig.LOGVIEWER_HTTPS_PORT));
}
return UIHelpers.urlFormat("http://%s:%s/api/v1/daemonlog?file=nimbus.log",
host, config.get(DaemonConfig.LOGVIEWER_PORT));
}
/**
* Get log link to supervisor log.
* @param host supervisor host name
* @param config storm config
* @return log link.
*/
public static String getSupervisorLogLink(String host, Map<String, Object> config) {
if (isSecureLogviewer(config)) {
return UIHelpers.urlFormat("https://%s:%s/api/v1/daemonlog?file=supervisor.log",
host, config.get(DaemonConfig.LOGVIEWER_HTTPS_PORT));
}
return UIHelpers.urlFormat("http://%s:%s/api/v1/daemonlog?file=supervisor.log",
host, config.get(DaemonConfig.LOGVIEWER_PORT));
}
/**
* Get log link to supervisor log.
* @param host supervisor host name
* @param config storm config
* @return log link.
*/
public static String getWorkerLogLink(String host, int port,
Map<String, Object> config, String topologyId) {
return getLogviewerLink(host,
WebAppUtils.logsFilename(
topologyId, String.valueOf(port)),
config, port
);
}
/**
* Get supervisor info in a map.
* @param supervisorSummary from nimbus call.
* @param config Storm config.
* @return prettified supervisor info map.
*/
public static Map<String, Object> getPrettifiedSupervisorMap(
SupervisorSummary supervisorSummary,
Map<String, Object> config) {
Map<String, Object> result = new HashMap();
result.put("id", supervisorSummary.get_supervisor_id());
result.put("host", supervisorSummary.get_host());
result.put("uptime", UIHelpers.prettyUptimeSec(supervisorSummary.get_uptime_secs()));
result.put("blacklisted", supervisorSummary.is_blacklisted());
result.put("uptimeSeconds", supervisorSummary.get_uptime_secs());
result.put("slotsTotal", supervisorSummary.get_num_workers());
result.put("slotsUsed", supervisorSummary.get_num_used_workers());
result.put("slotsFree",
Integer.max(supervisorSummary.get_num_workers()
- supervisorSummary.get_num_used_workers(), 0));
Map<String, Double> totalResources = supervisorSummary.get_total_resources();
Double totalMemory = totalResources.getOrDefault(
Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME,
totalResources.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB)
);
result.put("totalMem",
totalMemory
);
Double totalCpu = totalResources.getOrDefault(
Constants.COMMON_CPU_RESOURCE_NAME,
totalResources.get(Config.SUPERVISOR_CPU_CAPACITY)
);
result.put("totalCpu",
totalCpu);
result.put("usedMem", supervisorSummary.get_used_mem());
result.put("usedCpu", supervisorSummary.get_used_cpu());
result.put(
"logLink",
getSupervisorLogLink(supervisorSummary.get_host(), config)
);
result.put("availMem", totalMemory - supervisorSummary.get_used_mem());
result.put("availCpu", totalCpu - supervisorSummary.get_used_cpu());
result.put("version", supervisorSummary.get_version());
Map<String, Double> totalGenericResources = new HashMap<>(totalResources);
result.put("totalGenericResources", prettifyGenericResources(totalGenericResources));
Map<String, Double> usedGenericResources = supervisorSummary.get_used_generic_resources();
result.put("usedGenericResources", prettifyGenericResources(usedGenericResources));
Map<String, Double> availGenericResources = NormalizedResourceRequest
.subtractResourceMap(totalGenericResources, usedGenericResources);
result.put("availGenericResources", prettifyGenericResources(availGenericResources));
return result;
}
/**
* Get topology history.
* @param topologyHistory from Nimbus call.
* @return map ready to be returned.
*/
public static Map<String, Object> getTopologyHistoryInfo(TopologyHistoryInfo topologyHistory) {
Map<String, Object> result = new HashMap();
result.put("topo-history", topologyHistory.get_topo_ids());
return result;
}
/**
* Check if logviewer is secure.
* @param config Storm config.
* @return true if logiviwer is secure.
*/
public static boolean isSecureLogviewer(Map<String, Object> config) {
if (config.containsKey(DaemonConfig.LOGVIEWER_HTTPS_PORT)) {
int logviewerPort = (int) config.get(DaemonConfig.LOGVIEWER_HTTPS_PORT);
if (logviewerPort >= 0) {
return true;
}
}
return false;
}
/**
* Get logviewer port depending on whether the logviewer is secure or not.
* @param config Storm config.
* @return appropriate port.
*/
public static int getLogviewerPort(Map<String, Object> config) {
if (isSecureLogviewer(config)) {
return (int) config.get(DaemonConfig.LOGVIEWER_HTTPS_PORT);
}
return (int) config.get(DaemonConfig.LOGVIEWER_PORT);
}
/**
* getWorkerSummaries.
* @param supervisorPageInfo supervisorPageInfo
* @param config config
* @return getWorkerSummaries
*/
public static List<Map> getWorkerSummaries(SupervisorPageInfo supervisorPageInfo,
Map<String, Object> config) {
List<Map> workerSummaries = new ArrayList();
if (supervisorPageInfo.is_set_worker_summaries()) {
for (WorkerSummary workerSummary : supervisorPageInfo.get_worker_summaries()) {
workerSummaries.add(getWorkerSummaryMap(workerSummary, config));
}
}
return workerSummaries;
}
/**
* getWorkerSummaryMap.
* @param workerSummary workerSummary
* @param config config
* @return getWorkerSummaryMap
*/
private static Map getWorkerSummaryMap(WorkerSummary workerSummary, Map<String, Object> config) {
Map<String, Object> result = new HashMap();
result.put("supervisorId", workerSummary.get_supervisor_id());
result.put("host", workerSummary.get_host());
result.put("port", workerSummary.get_port());
result.put("topologyId", workerSummary.get_topology_id());
result.put("topologyName", workerSummary.get_topology_name());
result.put("executorsTotal", workerSummary.get_num_executors());
result.put("assignedMemOnHeap", workerSummary.get_assigned_memonheap());
result.put("assignedMemOffHeap", workerSummary.get_assigned_memoffheap());
result.put("assignedCpu", workerSummary.get_assigned_cpu());
result.put("componentNumTasks", workerSummary.get_component_to_num_tasks());
result.put("uptime", UIHelpers.prettyUptimeSec(workerSummary.get_uptime_secs()));
result.put("uptimeSeconds", workerSummary.get_uptime_secs());
result.put("workerLogLink", getWorkerLogLink(workerSummary.get_host(),
workerSummary.get_port(), config, workerSummary.get_topology_id()));
result.put("owner", workerSummary.get_owner());
return result;
}
/**
* getSupervisorSummary.
* @param supervisors supervisor summary list.
* @param securityContext security context injected.
* @param config Storm config.
* @return Prettified JSON.
*/
public static Map<String, Object> getSupervisorSummary(
List<SupervisorSummary> supervisors,
SecurityContext securityContext, Map<String, Object> config) {
Map<String, Object> result = new HashMap();
addLogviewerInfo(config, result);
List<Map> supervisorMaps = getSupervisorsMap(supervisors, config);
result.put("supervisors", supervisorMaps);
result.put("schedulerDisplayResource",
config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE)
);
return result;
}
/**
* getSupervisorsMap.
* @param supervisors supervisors
* @param config config
* @return getSupervisorsMap
*/
private static List<Map> getSupervisorsMap(List<SupervisorSummary> supervisors,
Map<String, Object> config) {
List<Map> supervisorMaps = new ArrayList<>();
for (SupervisorSummary supervisorSummary : supervisors) {
supervisorMaps.add(getPrettifiedSupervisorMap(supervisorSummary, config));
}
return supervisorMaps;
}
/**
* addLogviewerInfo.
* @param config config
* @param result result
*/
private static void addLogviewerInfo(Map<String, Object> config, Map<String, Object> result) {
result.put("logviewerPort", getLogviewerPort(config));
String logviewerScheme = "http";
if (isSecureLogviewer(config)) {
logviewerScheme = "https";
}
result.put("logviewerScheme", logviewerScheme);
}
/**
* getSupervisorPageInfo.
* @param supervisorPageInfo supervisorPageInfo
* @param config config
* @return getSupervisorPageInfo
*/
public static Map<String, Object> getSupervisorPageInfo(
SupervisorPageInfo supervisorPageInfo, Map<String,Object> config) {
Map<String, Object> result = new HashMap<>();
result.put("workers", getWorkerSummaries(supervisorPageInfo, config));
result.put("schedulerDisplayResource", config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE));
List<Map> supervisorMaps = getSupervisorsMap(supervisorPageInfo.get_supervisor_summaries(), config);
result.put("supervisors", supervisorMaps);
addLogviewerInfo(config, result);
return result;
}
/**
* getAllTopologiesSummary.
* @param topologies topologies
* @param config config
* @return getAllTopologiesSummary
*/
public static Map<String, Object> getAllTopologiesSummary(
List<TopologySummary> topologies, Map<String,Object> config) {
Map<String, Object> result = new HashMap();
result.put("topologies", getTopologiesMap(null, topologies));
result.put("schedulerDisplayResource", config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE));
return result;
}
/**
* getWindowHint.
* @param window window
* @return getWindowHint
*/
public static String getWindowHint(String window) {
if (window.equals(":all-time")) {
return "All time";
}
return UIHelpers.prettyUptimeSec(window);
}
/**
* getStatDisplayMap.
* @param rawDisplayMap rawDisplayMap
* @return getStatDisplayMap
*/
public static Map<String, Double> getStatDisplayMap(Map<String, Double> rawDisplayMap) {
Map<String, Double> result = new HashMap();
for (Map.Entry<String, Double> entry : rawDisplayMap.entrySet()) {
result.put(getWindowHint(entry.getKey()), entry.getValue());
}
return result;
}
/**
* getTopologySummary.
* @param topologyPageInfo topologyPageInfo
* @param window window
* @param config config
* @param remoteUser remoteUser
* @return getTopologySummary
*/
public static Map<String, Object> getTopologySummary(TopologyPageInfo topologyPageInfo,
String window, Map<String, Object> config, String remoteUser) {
Map<String, Object> result = new HashMap();
Map<String, Object> topologyConf = (Map<String, Object>) JSONValue.parse(topologyPageInfo.get_topology_conf());
long messageTimeout = (long) topologyConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);
Map<String, Object> unpackedTopologyPageInfo =
unpackTopologyInfo(topologyPageInfo, window, config);
result.putAll(unpackedTopologyPageInfo);
result.put("user", remoteUser);
result.put("window", window);
result.put("windowHint", getWindowHint(window));
result.put("msgTimeout", messageTimeout);
result.put("configuration", topologyConf);
result.put("visualizationTable", new ArrayList());
result.put("schedulerDisplayResource", config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE));
result.put("bugtracker-url", config.get(DaemonConfig.UI_PROJECT_BUGTRACKER_URL));
result.put("central-log-url", config.get(DaemonConfig.UI_CENTRAL_LOGGING_URL));
return result;
}
/**
* getStatDisplayMapLong.
* @param windowToTransferred windowToTransferred
* @return getStatDisplayMapLong
*/
private static Map<String, Long> getStatDisplayMapLong(Map<String,Long> windowToTransferred) {
Map<String, Long> result = new HashMap();
for (Map.Entry<String, Long> entry : windowToTransferred.entrySet()) {
result.put(entry.getKey(), entry.getValue());
}
return result;
}
/**
* getCommonAggStatsMap.
* @param commonAggregateStats commonAggregateStats
* @return getCommonAggStatsMap
*/
private static Map<String, Object> getCommonAggStatsMap(CommonAggregateStats commonAggregateStats) {
Map<String, Object> result = new HashMap();
result.put("executors", commonAggregateStats.get_num_executors());
result.put("tasks", commonAggregateStats.get_num_tasks());
result.put("emitted", commonAggregateStats.get_emitted());
result.put("transferred", commonAggregateStats.get_transferred());
result.put("acked", commonAggregateStats.get_acked());
result.put("failed", commonAggregateStats.get_failed());
if (commonAggregateStats.is_set_resources_map()) {
result.put(
"requestedMemOnHeap",
commonAggregateStats.get_resources_map().get(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME)
);
result.put(
"requestedMemOffHeap",
commonAggregateStats.get_resources_map().get(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME));
result.put(
"requestedCpu",
commonAggregateStats.get_resources_map().get(Constants.COMMON_CPU_RESOURCE_NAME));
result.put(
"requestedGenericResourcesComp",
prettifyGenericResources(commonAggregateStats.get_resources_map()));
}
return result;
}
/**
* getTruncatedErrorString.
* @param errorString errorString
* @return getTruncatedErrorString
*/
private static String getTruncatedErrorString(String errorString) {
return errorString.substring(0, Math.min(errorString.length(), 200));
}
/**
* getSpoutAggStatsMap.
* @param componentAggregateStats componentAggregateStats
* @param window window
* @return getSpoutAggStatsMap
*/
private static Map<String, Object> getSpoutAggStatsMap(
ComponentAggregateStats componentAggregateStats, String window) {
Map<String, Object> result = new HashMap();
SpoutAggregateStats spoutAggregateStats = componentAggregateStats.get_specific_stats().get_spout();
CommonAggregateStats commonStats = componentAggregateStats.get_common_stats();
result.put("window", window);
result.put("windowPretty", getWindowHint(window));
result.put("emitted", commonStats.get_emitted());
result.put("transferred", commonStats.get_transferred());
result.put("acked", commonStats.get_acked());
result.put("failed", commonStats.get_failed());
result.put("completeLatency", spoutAggregateStats.get_complete_latency_ms());
ErrorInfo lastError = componentAggregateStats.get_last_error();
result.put("lastError", Objects.isNull(lastError) ? "" : getTruncatedErrorString(lastError.get_error()));
return result;
}
/**
* getBoltAggStatsMap.
* @param componentAggregateStats componentAggregateStats
* @param window window
* @return getBoltAggStatsMap
*/
private static Map<String, Object> getBoltAggStatsMap(
ComponentAggregateStats componentAggregateStats, String window) {
Map<String, Object> result = new HashMap();
CommonAggregateStats commonStats = componentAggregateStats.get_common_stats();
result.put("window", window);
result.put("windowPretty", getWindowHint(window));
result.put("emitted", commonStats.get_emitted());
result.put("transferred", commonStats.get_transferred());
result.put("acked", commonStats.get_acked());
result.put("failed", commonStats.get_failed());
BoltAggregateStats boltAggregateStats = componentAggregateStats.get_specific_stats().get_bolt();
result.put("executeLatency", StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms()));
result.put("executed", boltAggregateStats.get_executed());
result.put("processLatency", StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms()));
result.put("capacity", StatsUtil.floatStr(boltAggregateStats.get_capacity()));
return result;
}
/**
* nullToZero.
* @param value value
* @return nullToZero
*/
private static Long nullToZero(Long value) {
return !Objects.isNull(value) ? value : 0;
}
/**
* nullToZero.
* @param value value
* @return nullToZero
*/
private static Double nullToZero(Double value) {
return !Objects.isNull(value) ? value : 0;
}
/**
* getBoltInputStats.
* @param globalStreamId globalStreamId
* @param componentAggregateStats componentAggregateStats
* @return getBoltInputStats
*/
private static Map<String, Object> getBoltInputStats(GlobalStreamId globalStreamId,
ComponentAggregateStats componentAggregateStats) {
Map<String, Object> result = new HashMap();
SpecificAggregateStats specificAggregateStats = componentAggregateStats.get_specific_stats();
BoltAggregateStats boltAggregateStats = specificAggregateStats.get_bolt();
CommonAggregateStats commonAggregateStats = componentAggregateStats.get_common_stats();
String componentId = globalStreamId.get_componentId();
result.put("component", componentId);
result.put("encodedComponentId", Utils.urlEncodeUtf8(componentId));
result.put("stream", globalStreamId.get_streamId());
result.put("executeLatency", StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms()));
result.put("processLatency", StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms()));
result.put("executed", nullToZero(boltAggregateStats.get_executed()));
result.put("acked", nullToZero(commonAggregateStats.get_acked()));
result.put("failed", nullToZero(commonAggregateStats.get_failed()));
return result;
}
/**
* getBoltOutputStats.
* @param streamId streamId
* @param componentAggregateStats componentAggregateStats
* @return getBoltOutputStats
*/
private static Map<String, Object> getBoltOutputStats(String streamId,
ComponentAggregateStats componentAggregateStats) {
Map<String, Object> result = new HashMap();
result.put("stream", streamId);
CommonAggregateStats commonStats = componentAggregateStats.get_common_stats();
result.put("emitted", nullToZero(commonStats.get_emitted()));
result.put("transferred", nullToZero(commonStats.get_transferred()));
return result;
}
/**
* getSpoutOutputStats.
* @param streamId streamId
* @param componentAggregateStats componentAggregateStats
* @return getSpoutOutputStats
*/
private static Map<String, Object> getSpoutOutputStats(String streamId,
ComponentAggregateStats componentAggregateStats) {
SpecificAggregateStats specificAggregateStats = componentAggregateStats.get_specific_stats();
SpoutAggregateStats spoutAggregateStats = specificAggregateStats.get_spout();
Map<String, Object> result = new HashMap();
result.put("stream", streamId);
CommonAggregateStats commonStats = componentAggregateStats.get_common_stats();
result.put("emitted", nullToZero(commonStats.get_emitted()));
result.put("transferred", nullToZero(commonStats.get_transferred()));
result.put("completeLatency", StatsUtil.floatStr(spoutAggregateStats.get_complete_latency_ms()));
result.put("acked", nullToZero(commonStats.get_acked()));
result.put("failed", nullToZero(commonStats.get_failed()));
return result;
}
/**
* getBoltExecutorStats.
* @param topologyId topologyId
* @param config config
* @param executorAggregateStats executorAggregateStats
* @return getBoltExecutorStats
*/
private static Map<String, Object> getBoltExecutorStats(String topologyId, Map<String, Object> config,
ExecutorAggregateStats executorAggregateStats) {
Map<String, Object> result = new HashMap();
ExecutorSummary executorSummary = executorAggregateStats.get_exec_summary();
ExecutorInfo executorInfo = executorSummary.get_executor_info();
String executorId = prettyExecutorInfo(executorInfo);
result.put("id", executorId);
result.put("encodedId", Utils.urlEncodeUtf8(executorId));
result.put("uptime", prettyUptimeSec(executorSummary.get_uptime_secs()));
result.put("uptimeSeconds", executorSummary.get_uptime_secs());
String host = executorSummary.get_host();
result.put("host", host);
int port = executorSummary.get_port();
result.put("port", port);
ComponentAggregateStats componentAggregateStats = executorAggregateStats.get_stats();
CommonAggregateStats commonAggregateStats = componentAggregateStats.get_common_stats();
result.put("emitted", nullToZero(commonAggregateStats.get_emitted()));
result.put("transferred", nullToZero(commonAggregateStats.get_transferred()));
SpecificAggregateStats specificAggregateStats = componentAggregateStats.get_specific_stats();
BoltAggregateStats boltAggregateStats = specificAggregateStats.get_bolt();
result.put("capacity", StatsUtil.floatStr(nullToZero(boltAggregateStats.get_capacity())));
result.put("executeLatency", StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms()));
result.put("executed", nullToZero(boltAggregateStats.get_executed()));
result.put("processLatency", StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms()));
result.put("acked", nullToZero(commonAggregateStats.get_acked()));
result.put("failed", nullToZero(commonAggregateStats.get_failed()));
result.put("workerLogLink", getWorkerLogLink(host, port, config, topologyId));
return result;
}
/**
* getSpoutExecutorStats.
* @param topologyId topologyId
* @param config config
* @param executorAggregateStats executorAggregateStats
* @return getSpoutExecutorStats
*/
private static Map<String, Object> getSpoutExecutorStats(String topologyId, Map<String, Object> config,
ExecutorAggregateStats executorAggregateStats) {
Map<String, Object> result = new HashMap();
ExecutorSummary executorSummary = executorAggregateStats.get_exec_summary();
ExecutorInfo executorInfo = executorSummary.get_executor_info();
ComponentAggregateStats componentAggregateStats = executorAggregateStats.get_stats();
SpecificAggregateStats specificAggregateStats = componentAggregateStats.get_specific_stats();
SpoutAggregateStats spoutAggregateStats = specificAggregateStats.get_spout();
CommonAggregateStats commonAggregateStats = componentAggregateStats.get_common_stats();
String executorId = prettyExecutorInfo(executorInfo);
result.put("id", executorId);
result.put("encodedId", Utils.urlEncodeUtf8(executorId));
result.put("uptime", prettyUptimeSec(executorSummary.get_uptime_secs()));
result.put("uptimeSeconds", executorSummary.get_uptime_secs());
String host = executorSummary.get_host();
result.put("host", host);
int port = executorSummary.get_port();
result.put("port", port);
result.put("emitted", nullToZero(commonAggregateStats.get_emitted()));
result.put("transferred", nullToZero(commonAggregateStats.get_transferred()));
result.put("completeLatency", StatsUtil.floatStr(spoutAggregateStats.get_complete_latency_ms()));
result.put("acked", nullToZero(commonAggregateStats.get_acked()));
result.put("failed", nullToZero(commonAggregateStats.get_failed()));
result.put("workerLogLink", getWorkerLogLink(host, port, config, topologyId));
return result;
}
/**
* getComponentErrorInfo.
* @param errorInfo errorInfo
* @param config config
* @param topologyId topologyId
* @return getComponentErrorInfo
*/
private static Map<String, Object> getComponentErrorInfo(ErrorInfo errorInfo, Map config,
String topologyId) {
Map<String, Object> result = new HashMap();
result.put("errorTime",
errorInfo.get_error_time_secs());
String host = errorInfo.get_host();
result.put("errorHost", host);
int port = errorInfo.get_port();
result.put("errorPort", port);
result.put("errorWorkerLogLink", getWorkerLogLink(host, port, config, topologyId));
result.put("errorLapsedSecs", Time.deltaSecs(errorInfo.get_error_time_secs()));
result.put("error", errorInfo.get_error());
return result;
}
/**
* getComponentErrors.
* @param errorInfoList errorInfoList
* @param topologyId topologyId
* @param config config
* @return getComponentErrors
*/
private static Map<String, Object> getComponentErrors(List<ErrorInfo> errorInfoList,
String topologyId, Map config) {
Map<String, Object> result = new HashMap();
errorInfoList.sort(Comparator.comparingInt(ErrorInfo::get_error_time_secs));
result.put(
"componentErrors",
errorInfoList.stream().map(e -> getComponentErrorInfo(e, config, topologyId))
.collect(Collectors.toList())
);
return result;
}
/**
* getTopologyErrors.
* @param errorInfoList errorInfoList
* @param topologyId topologyId
* @param config config
* @return getTopologyErrors
*/
private static Map<String, Object> getTopologyErrors(List<ErrorInfo> errorInfoList,
String topologyId, Map config) {
Map<String, Object> result = new HashMap();
errorInfoList.sort(Comparator.comparingInt(ErrorInfo::get_error_time_secs));
result.put(
"topologyErrors",
errorInfoList.stream().map(e -> getComponentErrorInfo(e, config, topologyId))
.collect(Collectors.toList())
);
return result;
}
/**
* getTopologySpoutAggStatsMap.
* @param componentAggregateStats componentAggregateStats
* @param spoutId spoutId
* @return getTopologySpoutAggStatsMap
*/
private static Map<String, Object> getTopologySpoutAggStatsMap(ComponentAggregateStats componentAggregateStats,
String spoutId) {
Map<String, Object> result = new HashMap();
CommonAggregateStats commonStats = componentAggregateStats.get_common_stats();
result.putAll(getCommonAggStatsMap(commonStats));
result.put("spoutId", spoutId);
result.put("encodedSpoutId", Utils.urlEncodeUtf8(spoutId));
SpoutAggregateStats spoutAggregateStats = componentAggregateStats.get_specific_stats().get_spout();
result.put("completeLatency", spoutAggregateStats.get_complete_latency_ms());
ErrorInfo lastError = componentAggregateStats.get_last_error();
result.put("lastError", Objects.isNull(lastError) ? "" : getTruncatedErrorString(lastError.get_error()));
return result;
}
/**
* getTopologyBoltAggStatsMap.
* @param componentAggregateStats componentAggregateStats
* @param boltId boltId
* @return getTopologyBoltAggStatsMap
*/
private static Map<String, Object> getTopologyBoltAggStatsMap(ComponentAggregateStats componentAggregateStats,
String boltId) {
Map<String, Object> result = new HashMap();
CommonAggregateStats commonStats = componentAggregateStats.get_common_stats();
result.putAll(getCommonAggStatsMap(commonStats));
result.put("boltId", boltId);
result.put("encodedBoltId", Utils.urlEncodeUtf8(boltId));
BoltAggregateStats boltAggregateStats = componentAggregateStats.get_specific_stats().get_bolt();
result.put("capacity", StatsUtil.floatStr(boltAggregateStats.get_capacity()));
result.put("executeLatency", StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms()));
result.put("executed", boltAggregateStats.get_executed());
result.put("processLatency", StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms()));
ErrorInfo lastError = componentAggregateStats.get_last_error();
result.put("lastError", Objects.isNull(lastError) ? "" : getTruncatedErrorString(lastError.get_error()));
return result;
}
/**
* getTopologyStatsMap.
* @param topologyStats topologyStats
* @return getTopologyStatsMap
*/
private static List<Map> getTopologyStatsMap(TopologyStats topologyStats) {
List<Map> result = new ArrayList();
Map<String, Long> emittedStatDisplayMap = getStatDisplayMapLong(topologyStats.get_window_to_emitted());
Map<String, Long> transferred = getStatDisplayMapLong(topologyStats.get_window_to_transferred());
Map<String, Double> completeLatency = getStatDisplayMap(topologyStats.get_window_to_complete_latencies_ms());
Map<String, Long> acked = getStatDisplayMapLong(topologyStats.get_window_to_acked());
Map<String, Long> failed = getStatDisplayMapLong(topologyStats.get_window_to_failed());
for (String window : emittedStatDisplayMap.keySet()) {
Map<String, Object> temp = new HashMap();
temp.put("windowPretty", getWindowHint(window));
temp.put("window", window);
temp.put("emitted", emittedStatDisplayMap.get(window));
temp.put("transferred", transferred.get(window));
temp.put("completeLatency", StatsUtil.floatStr(completeLatency.get(getWindowHint(window))));
temp.put("acked", acked.get(window));
temp.put("failed", failed.get(window));
result.add(temp);
}
return result;
}
/**
* unpackTopologyInfo.
* @param topologyPageInfo topologyPageInfo
* @param window window
* @param config config
* @return unpackTopologyInfo
*/
private static Map<String,Object> unpackTopologyInfo(TopologyPageInfo topologyPageInfo, String window, Map<String,Object> config) {
Map<String, Object> result = new HashMap();
result.put("id", topologyPageInfo.get_id());
result.put("encodedId", Utils.urlEncodeUtf8(topologyPageInfo.get_id()));
result.put("owner", topologyPageInfo.get_owner());
result.put("name", topologyPageInfo.get_name());
result.put("status", topologyPageInfo.get_status());
result.put("uptime", UIHelpers.prettyUptimeSec(topologyPageInfo.get_uptime_secs()));
result.put("uptimeSeconds", topologyPageInfo.get_uptime_secs());
result.put("tasksTotal", topologyPageInfo.get_num_tasks());
result.put("workersTotal", topologyPageInfo.get_num_workers());
result.put("executorsTotal", topologyPageInfo.get_num_executors());
result.put("schedulerInfo", topologyPageInfo.get_sched_status());
result.put("requestedMemOnHeap", topologyPageInfo.get_requested_memonheap());
result.put("requestedMemOffHeap", topologyPageInfo.get_requested_memoffheap());
result.put("requestedCpu", topologyPageInfo.get_requested_cpu());
result.put("requestedTotalMem",
topologyPageInfo.get_requested_memonheap() + topologyPageInfo.get_requested_memoffheap()
);
result.put("assignedMemOnHeap", topologyPageInfo.get_assigned_memonheap());
result.put("assignedMemOffHeap", topologyPageInfo.get_assigned_memoffheap());
result.put("assignedTotalMem", topologyPageInfo.get_assigned_memonheap()
+ topologyPageInfo.get_assigned_memoffheap());
result.put("assignedCpu", topologyPageInfo.get_assigned_cpu());
result.put("requestedRegularOnHeapMem", topologyPageInfo.get_requested_regular_on_heap_memory());
result.put("requestedSharedOnHeapMem", topologyPageInfo.get_requested_shared_on_heap_memory());
result.put("requestedRegularOffHeapMem", topologyPageInfo.get_requested_regular_off_heap_memory());
result.put("requestedSharedOffHeapMem", topologyPageInfo.get_requested_shared_off_heap_memory());
result.put("requestedGenericResources", prettifyGenericResources(topologyPageInfo.get_requested_generic_resources()));
result.put("assignedRegularOnHeapMem", topologyPageInfo.get_assigned_regular_on_heap_memory());
result.put("assignedSharedOnHeapMem", topologyPageInfo.get_assigned_shared_on_heap_memory());
result.put("assignedRegularOffHeapMem", topologyPageInfo.get_assigned_regular_off_heap_memory());
result.put("assignedSharedOffHeapMem", topologyPageInfo.get_assigned_shared_off_heap_memory());
result.put("assignedGenericResources", prettifyGenericResources(topologyPageInfo.get_assigned_generic_resources()));
result.put("topologyStats", getTopologyStatsMap(topologyPageInfo.get_topology_stats()));
List<Map> workerSummaries = new ArrayList();
if (topologyPageInfo.is_set_workers()) {
for (WorkerSummary workerSummary : topologyPageInfo.get_workers()) {
workerSummaries.add(getWorkerSummaryMap(workerSummary, config));
}
}
result.put("workers", workerSummaries);
Map<String, ComponentAggregateStats> spouts = topologyPageInfo.get_id_to_spout_agg_stats();
List<Map> spoutStats = new ArrayList();
for (Map.Entry<String, ComponentAggregateStats> spoutEntry : spouts.entrySet()) {
spoutStats.add(getTopologySpoutAggStatsMap(spoutEntry.getValue(), spoutEntry.getKey()));
}
result.put("spouts", spoutStats);
Map<String, ComponentAggregateStats> bolts = topologyPageInfo.get_id_to_bolt_agg_stats();
List<Map> boltStats = new ArrayList();
for (Map.Entry<String, ComponentAggregateStats> boltEntry : bolts.entrySet()) {
boltStats.add(getTopologyBoltAggStatsMap(boltEntry.getValue(), boltEntry.getKey()));
}
result.put("bolts", boltStats);
result.put("configuration", topologyPageInfo.get_topology_conf());
boolean debuggingEnabled = false;
if (topologyPageInfo.is_set_debug_options()) {
debuggingEnabled = topologyPageInfo.get_debug_options().is_enable();
}
result.put("debug", debuggingEnabled);
double samplingPct = 10;
if (debuggingEnabled) {
samplingPct = topologyPageInfo.get_debug_options().get_samplingpct();
}
result.put("samplingPct", samplingPct);
result.put("replicationCount", topologyPageInfo.get_replication_count());
result.put("topologyVersion", topologyPageInfo.get_topology_version());
result.put("stormVersion", topologyPageInfo.get_storm_version());
return result;
}
/**
* getTopologyWorkers.
* @param topologyInfo topologyInfo
* @param config config
* @return getTopologyWorkers.
*/
public static Map<String, Object> getTopologyWorkers(TopologyInfo topologyInfo, Map config) {
List<Map> executorSummaries = new ArrayList();
for (ExecutorSummary executorSummary : topologyInfo.get_executors()) {
Map<String, Object> executorSummaryMap = new HashMap();
executorSummaryMap.put("host", executorSummary.get_host());
executorSummaryMap.put("port", executorSummary.get_port());
executorSummaries.add(executorSummaryMap);
}
HashSet hashSet = new HashSet();
hashSet.addAll(executorSummaries);
executorSummaries.clear();
executorSummaries.addAll(hashSet);
Map<String, Object> result = new HashMap();
result.put("hostPortList", executorSummaries);
addLogviewerInfo(config, result);
return result;
}
/**
* getTopologyLag.
* @param userTopology userTopology
* @param config config
* @return getTopologyLag.
*/
public static Map<String, Map<String, Object>> getTopologyLag(StormTopology userTopology, Map<String,Object> config) {
Boolean disableLagMonitoring = (Boolean)(config.get(DaemonConfig.UI_DISABLE_SPOUT_LAG_MONITORING));
return disableLagMonitoring ? Collections.EMPTY_MAP : TopologySpoutLag.lag(userTopology, config);
}
/**
* getBoltExecutors.
* @param executorSummaries executorSummaries
* @param stormTopology stormTopology
* @param sys sys
* @return getBoltExecutors.
*/
public static Map<String, List<ExecutorSummary>> getBoltExecutors(List<ExecutorSummary> executorSummaries,
StormTopology stormTopology, boolean sys) {
Map<String, List<ExecutorSummary>> result = new HashMap();
for (ExecutorSummary executorSummary : executorSummaries) {
if (StatsUtil.componentType(stormTopology, executorSummary.get_component_id()).equals("bolt")
&& (sys || !Utils.isSystemId(executorSummary.get_component_id()))) {
List<ExecutorSummary> executorSummaryList = result.getOrDefault(executorSummary.get_component_id(), new ArrayList());
executorSummaryList.add(executorSummary);
result.put(executorSummary.get_component_id(), executorSummaryList);
}
}
return result;
}
/**
* getSpoutExecutors.
* @param executorSummaries executorSummaries
* @param stormTopology stormTopology
* @return getSpoutExecutors.
*/
public static Map<String, List<ExecutorSummary>> getSpoutExecutors(List<ExecutorSummary> executorSummaries,
StormTopology stormTopology) {
Map<String, List<ExecutorSummary>> result = new HashMap();
for (ExecutorSummary executorSummary : executorSummaries) {
if (StatsUtil.componentType(stormTopology, executorSummary.get_component_id()).equals("spout")) {
List<ExecutorSummary> executorSummaryList = result.getOrDefault(executorSummary.get_component_id(), new ArrayList());
executorSummaryList.add(executorSummary);
result.put(executorSummary.get_component_id(), executorSummaryList);
}
}
return result;
}
/**
* sanitizeStreamName.
* @param streamName streamName
* @return sanitizeStreamName
*/
public static String sanitizeStreamName(String streamName) {
Pattern pattern = Pattern.compile("(?![A-Za-z_\\-:\\.]).");
Pattern pattern2 = Pattern.compile("^[A-Za-z]");
Matcher matcher = pattern2.matcher(streamName);
Matcher matcher2 = pattern.matcher("\\s" + streamName);
if (matcher.find()) {
matcher2 = pattern.matcher(streamName);
}
return matcher2.replaceAll("_");
}
/**
* sanitizeTransferredStats.
* @param stats stats
* @return sanitizeTransferredStats
*/
public static Map<String, Map<String,Long>> sanitizeTransferredStats(Map<String, Map<String,Long>> stats) {
Map<String, Map<String,Long>> result = new HashMap();
for (Map.Entry<String, Map<String,Long>> entry : stats.entrySet()) {
Map<String,Long> temp = new HashMap();
for (Map.Entry<String,Long> innerEntry : entry.getValue().entrySet()) {
temp.put(sanitizeStreamName(innerEntry.getKey()), innerEntry.getValue());
}
result.put(entry.getKey(), temp);
}
return result;
}
/**
* getStatMapFromExecutorSummary.
* @param executorSummary executorSummary
* @return getStatMapFromExecutorSummary
*/
public static Map<String, Object> getStatMapFromExecutorSummary(ExecutorSummary executorSummary) {
Map<String, Object> result = new HashMap();
result.put(":host", executorSummary.get_host());
result.put(":port", executorSummary.get_port());
result.put(":uptime_secs", executorSummary.get_uptime_secs());
result.put(":transferred", null);
if (executorSummary.is_set_stats()) {
result.put(":transferred", sanitizeTransferredStats(executorSummary.get_stats().get_transferred()));
}
return result;
}
/**
* getInputMap.
* @param entryInput entryInput
* @return getInputMap
*/
public static Map<String, Object> getInputMap(Map.Entry<GlobalStreamId,Grouping> entryInput) {
Map<String, Object> result = new HashMap();
result.put(":component", entryInput.getKey().get_componentId());
result.put(":stream", entryInput.getKey().get_streamId());
result.put(":sani-stream", sanitizeStreamName(entryInput.getKey().get_streamId()));
result.put(":grouping", entryInput.getValue().getSetField().getFieldName());
return result;
}
/**
* getVisualizationData.
* @param client client
* @param window window
* @param topoId topoId
* @param sys sys
* @return getVisualizationData
* @throws TException TException
*/
public static Map<String, Object> getVisualizationData(
Nimbus.Iface client, String window, String topoId, boolean sys) throws TException {
GetInfoOptions getInfoOptions = new GetInfoOptions();
getInfoOptions.set_num_err_choice(NumErrorsChoice.ONE);
TopologyInfo topologyInfo = client.getTopologyInfoWithOpts(topoId, getInfoOptions);
StormTopology stormTopology = client.getTopology(topoId);
Map<String, List<ExecutorSummary>> boltSummaries = getBoltExecutors(topologyInfo.get_executors(), stormTopology, sys);
Map<String, List<ExecutorSummary>> spoutSummaries = getSpoutExecutors(topologyInfo.get_executors(), stormTopology);
Map<String, SpoutSpec> spoutSpecs = stormTopology.get_spouts();
Map<String, Bolt> boltSpecs = stormTopology.get_bolts();
Map<String, Object> result = new HashMap();
for (Map.Entry<String, SpoutSpec> spoutSpecMapEntry : spoutSpecs.entrySet()) {
String spoutComponentId = spoutSpecMapEntry.getKey();
if (spoutSummaries.containsKey(spoutComponentId)) {
Map<String, Object> spoutData = new HashMap();
spoutData.put(":type", "spout");
spoutData.put(":capacity", 0);
Map<String, Map> spoutStreamsStats =
StatsUtil.spoutStreamsStats(spoutSummaries.get(spoutComponentId), sys);
spoutData.put(":latency", spoutStreamsStats.get("complete-latencies").get(window));
spoutData.put(":transferred", spoutStreamsStats.get("transferred").get(window));
spoutData.put(":stats", spoutSummaries.get(
spoutComponentId).stream().map(
UIHelpers::getStatMapFromExecutorSummary).collect(Collectors.toList()));
spoutData.put(
":link",
UIHelpers.urlFormat("/component.html?id=%s&topology_id=%s", spoutComponentId, topoId)
);
spoutData.put(":inputs",
spoutSpecMapEntry.getValue().get_common().get_inputs().entrySet().stream().map(
UIHelpers::getInputMap).collect(Collectors.toList())
);
result.put(spoutComponentId, spoutData);
}
}
for (Map.Entry<String, Bolt> boltEntry : boltSpecs.entrySet()) {
String boltComponentId = boltEntry.getKey();
if (boltSummaries.containsKey(boltComponentId) && (sys || !Utils.isSystemId(boltComponentId))) {
Map<String, Object> boltMap = new HashMap();
boltMap.put(":type", "bolt");
boltMap.put(":capacity", StatsUtil.computeBoltCapacity(boltSummaries.get(boltComponentId)));
Map<String, Map> boltStreamsStats =
StatsUtil.boltStreamsStats(boltSummaries.get(boltComponentId), sys);
boltMap.put(":latency", boltStreamsStats.get("process-latencies").get(window));
boltMap.put(":transferred", boltStreamsStats.get("transferred").get(window));
boltMap.put(":stats", boltSummaries.get(
boltComponentId).stream().map(
UIHelpers::getStatMapFromExecutorSummary).collect(Collectors.toList()));
boltMap.put(
":link",
UIHelpers.urlFormat("/component.html?id=%s&topology_id=%s", boltComponentId, topoId)
);
boltMap.put(":inputs",
boltEntry.getValue().get_common().get_inputs().entrySet().stream().map(
UIHelpers::getInputMap).collect(Collectors.toList())
);
result.put(boltComponentId, boltMap);
}
}
return result;
}
/**
* getStreamBox.
* @param visualization visualization
* @return getStreamBox
*/
public static Map<String, Object> getStreamBox(Object visualization) {
Map<String, Object> visualizationData = (Map<String, Object>) visualization;
Map<String, Object> result = new HashMap();
Map<String, Object> temp = (Map<String, Object>) visualizationData.get("inputs");
result.put("stream", temp.get("stream"));
result.put("sani-stream", temp.get("sani-stream"));
result.put("checked", !Utils.isSystemId((String) temp.get("stream")));
return result;
}
/**
* getBuildVisualization.
* @param client client
* @param config config
* @param window window
* @param id id
* @param sys sys
* @return getBuildVisualization
* @throws TException TException
*/
public static Map<String, Object> getBuildVisualization(
Nimbus.Iface client, Map<String, Object> config, String window, String id, boolean sys)
throws TException {
Map<String, Object> result = new HashMap();
Map<String, Object> visualizationData = getVisualizationData(client, window, id, sys);
List<Map> streamBoxes = visualizationData.entrySet().stream().map(UIHelpers::getStreamBox).collect(Collectors.toList());
result.put("visualizationTable", Lists.partition(streamBoxes, 4));
return result;
}
/**
* getActiveAction.
* @param profileRequest profileRequest
* @param config config
* @param topologyId topologyId
* @return getActiveAction
*/
public static Map<String, Object> getActiveAction(ProfileRequest profileRequest, Map config, String topologyId) {
Map<String, Object> result = new HashMap();
result.put("host", profileRequest.get_nodeInfo().get_node());
result.put("port", String.valueOf(profileRequest.get_nodeInfo().get_port().toArray()[0]));
result.put("dumplink",
getWorkerDumpLink(
profileRequest.get_nodeInfo().get_node(),
(Long) profileRequest.get_nodeInfo().get_port().toArray()[0], topologyId, config
));
result.put("timestamp", System.currentTimeMillis() - profileRequest.get_time_stamp());
return result;
}
/**
* getActiveProfileActions.
* @param client client
* @param id id
* @param component component
* @param config config
* @return getActiveProfileActions
* @throws TException TException
*/
public static List getActiveProfileActions(Nimbus.Iface client, String id, String component, Map config) throws TException {
List<ProfileRequest> profileRequests =
client.getComponentPendingProfileActions(id, component, ProfileAction.JPROFILE_STOP);
return profileRequests.stream().map(x -> UIHelpers.getActiveAction(x, config, id)).collect(Collectors.toList());
}
/**
* getWorkerDumpLink.
*
* @param host host
* @param port port
* @param topologyId topologyId
* @param config config
* @return getWorkerDumpLink
*/
public static String getWorkerDumpLink(String host, long port, String topologyId, Map<String, Object> config) {
if (isSecureLogviewer(config)) {
return UIHelpers.urlFormat(
"https://%s:%s/api/v1/dumps/%s/%s",
Utils.urlEncodeUtf8(host), config.get(DaemonConfig.LOGVIEWER_HTTPS_PORT),
Utils.urlEncodeUtf8(topologyId),
Utils.urlEncodeUtf8(host)
+ ":" + Utils.urlEncodeUtf8(String.valueOf(port))
);
} else {
return UIHelpers.urlFormat(
"http://%s:%s/api/v1/dumps/%s/%s",
Utils.urlEncodeUtf8(host), config.get(DaemonConfig.LOGVIEWER_PORT),
Utils.urlEncodeUtf8(topologyId),
Utils.urlEncodeUtf8(host) + ":" + Utils.urlEncodeUtf8(String.valueOf(port))
);
}
}
/**
* unpackBoltPageInfo.
* @param componentPageInfo componentPageInfo
* @param topologyId topologyId
* @param window window
* @param sys sys
* @param config config
* @return unpackBoltPageInfo
*/
public static Map<String, Object> unpackBoltPageInfo(ComponentPageInfo componentPageInfo,
String topologyId, String window, boolean sys,
Map config) {
Map<String, Object> result = new HashMap<>();
result.put(
"boltStats",
componentPageInfo.get_window_to_stats().entrySet().stream().map(
e -> getBoltAggStatsMap(e.getValue(), e.getKey())
).collect(Collectors.toList())
);
result.put(
"inputStats",
componentPageInfo.get_gsid_to_input_stats().entrySet().stream().map(
e -> getBoltInputStats(e.getKey(), e.getValue())
).collect(Collectors.toList())
);
result.put(
"outputStats",
componentPageInfo.get_sid_to_output_stats().entrySet().stream().map(
e -> getBoltOutputStats(e.getKey(), e.getValue())
).collect(Collectors.toList())
);
result.put(
"executorStats",
componentPageInfo.get_exec_stats().stream().map(
e -> getBoltExecutorStats(topologyId, config, e)
).collect(Collectors.toList())
);
result.putAll(getComponentErrors(componentPageInfo.get_errors(), topologyId, config));
return result;
}
/**
* unpackSpoutPageInfo.
* @param componentPageInfo componentPageInfo
* @param topologyId topologyId
* @param window window
* @param sys sys
* @param config config
* @return unpackSpoutPageInfo
*/
public static Map<String, Object> unpackSpoutPageInfo(ComponentPageInfo componentPageInfo,
String topologyId, String window, boolean sys,
Map config) {
Map<String, Object> result = new HashMap<>();
result.put(
"spoutSummary",
componentPageInfo.get_window_to_stats().entrySet().stream().map(
e -> getSpoutAggStatsMap(e.getValue(), e.getKey())
).collect(Collectors.toList())
);
result.put(
"outputStats",
componentPageInfo.get_sid_to_output_stats().entrySet().stream().map(
e -> getSpoutOutputStats(e.getKey(), e.getValue())
).collect(Collectors.toList())
);
result.put(
"executorStats",
componentPageInfo.get_exec_stats().stream().map(
e -> getSpoutExecutorStats(topologyId, config, e)
).collect(Collectors.toList())
);
result.putAll(getComponentErrors(componentPageInfo.get_errors(), topologyId, config));
return result;
}
/**
* getComponentPage.
* @param client client
* @param id id
* @param component component
* @param window window
* @param sys sys
* @param user user
* @param config config
* @return getComponentPage
* @throws TException TException
*/
public static Map<String, Object> getComponentPage(
Nimbus.Iface client, String id, String component,
String window, boolean sys, String user, Map config) throws TException {
Map<String, Object> result = new HashMap();
ComponentPageInfo componentPageInfo = client.getComponentPageInfo(
id, component, window, sys
);
if (componentPageInfo.get_component_type().equals(ComponentType.BOLT)) {
result.putAll(unpackBoltPageInfo(componentPageInfo, id, window, sys, config));
} else if ((componentPageInfo.get_component_type().equals(ComponentType.SPOUT))) {
result.putAll(unpackSpoutPageInfo(componentPageInfo, id, window, sys, config));
}
result.put("user", user);
result.put("id" , component);
result.put("encodedId", Utils.urlEncodeUtf8(component));
result.put("name", componentPageInfo.get_topology_name());
result.put("executors", componentPageInfo.get_num_executors());
result.put("tasks", componentPageInfo.get_num_tasks());
result.put("requestedMemOnHeap",
componentPageInfo.get_resources_map().get(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME));
result.put("requestedMemOffHeap",
componentPageInfo.get_resources_map().get(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME));
result.put("requestedCpu",
componentPageInfo.get_resources_map().get(Constants.COMMON_CPU_RESOURCE_NAME));
result.put("requestedGenericResources",
prettifyGenericResources(componentPageInfo.get_resources_map()));
result.put("schedulerDisplayResource", config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE));
result.put("topologyId", id);
result.put("topologyStatus", componentPageInfo.get_topology_status());
result.put("encodedTopologyId", Utils.urlEncodeUtf8(id));
result.put("window", window);
result.put("componentType", componentPageInfo.get_component_type().toString().toLowerCase());
result.put("windowHint", getWindowHint(window));
result.put("debug", componentPageInfo.is_set_debug_options() && componentPageInfo.get_debug_options().is_enable());
double samplingPct = 10;
if (componentPageInfo.is_set_debug_options()) {
samplingPct = componentPageInfo.get_debug_options().get_samplingpct();
}
result.put("samplingPct", samplingPct);
String eventlogHost = componentPageInfo.get_eventlog_host();
if (null != eventlogHost && !eventlogHost.isEmpty()) {
result.put("eventLogLink", getLogviewerLink(eventlogHost,
WebAppUtils.eventLogsFilename(id, String.valueOf(componentPageInfo.get_eventlog_port())),
config, componentPageInfo.get_eventlog_port()));
}
result.put("profilingAndDebuggingCapable", !Utils.isOnWindows());
result.put("profileActionEnabled", config.get(DaemonConfig.WORKER_PROFILER_ENABLED));
result.put("profilerActive", getActiveProfileActions(client, id, component, config));
return result;
}
/**
* getTopolgoyLogConfig.
* @param logConfig logConfig
* @return getTopolgoyLogConfig
*/
public static Map<String, Object> getTopolgoyLogConfig(LogConfig logConfig) {
Map<String, Object> result = new HashMap();
if (logConfig.is_set_named_logger_level()) {
for (Map.Entry<String, LogLevel> entry : logConfig.get_named_logger_level().entrySet()) {
Map temp = new HashMap();
temp.put("target_level", entry.getValue().get_target_log_level());
temp.put("reset_level", entry.getValue().get_reset_log_level());
temp.put("timeout", entry.getValue().get_reset_log_level_timeout_secs());
temp.put("timeout_epoch", entry.getValue().get_reset_log_level_timeout_epoch());
result.put(entry.getKey(), temp);
}
}
Map finalResult = new HashMap();
finalResult.put("namedLoggerLevels", result);
return finalResult;
}
/**
* getTopologyOpResponse.
* @param id id
* @param op op
* @return getTopologyOpResponse
*/
public static Map<String, Object> getTopologyOpResponse(String id, String op) {
Map<String, Object> result = new HashMap();
result.put("topologyOperation", op);
result.put("topologyId", id);
result.put("status", "success");
return result;
}
/**
* putTopologyActivate.
* @param client client
* @param id id
* @return putTopologyActivate
* @throws TException TException
*/
public static Map<String, Object> putTopologyActivate(Nimbus.Iface client, String id) throws TException {
GetInfoOptions getInfoOptions = new GetInfoOptions();
getInfoOptions.set_num_err_choice(NumErrorsChoice.NONE);
TopologyInfo topologyInfo = client.getTopologyInfoWithOpts(id, getInfoOptions);
client.activate(topologyInfo.get_name());
return getTopologyOpResponse(id, "activate");
}
/**
* putTopologyDeactivate.
* @param client client
* @param id id
* @return putTopologyDeactivate
* @throws TException TException
*/
public static Map<String, Object> putTopologyDeactivate(Nimbus.Iface client, String id) throws TException {
GetInfoOptions getInfoOptions = new GetInfoOptions();
getInfoOptions.set_num_err_choice(NumErrorsChoice.NONE);
TopologyInfo topologyInfo = client.getTopologyInfoWithOpts(id, getInfoOptions);
client.deactivate(topologyInfo.get_name());
return getTopologyOpResponse(id, "deactivate");
}
/**
* putTopologyDebugActionSpct.
* @param client client
* @param id id
* @param action action
* @param spct spct
* @param component component
* @return putTopologyDebugActionSpct
* @throws TException TException
*/
public static Map<String, Object> putTopologyDebugActionSpct(
Nimbus.Iface client, String id, String action, String spct, String component) throws TException {
GetInfoOptions getInfoOptions = new GetInfoOptions();
getInfoOptions.set_num_err_choice(NumErrorsChoice.NONE);
TopologyInfo topologyInfo = client.getTopologyInfoWithOpts(id, getInfoOptions);
client.debug(topologyInfo.get_name(), component, action.equals("enable"), Integer.parseInt(spct));
return getTopologyOpResponse(id, "debug/" + action);
}
/**
* putTopologyRebalance.
* @param client client
* @param id id
* @param waitTime waitTime
* @return putTopologyRebalance
* @throws TException TException
*/
public static Map<String, Object> putTopologyRebalance(
Nimbus.Iface client, String id, String waitTime) throws TException {
GetInfoOptions getInfoOptions = new GetInfoOptions();
getInfoOptions.set_num_err_choice(NumErrorsChoice.NONE);
TopologyInfo topologyInfo = client.getTopologyInfoWithOpts(id, getInfoOptions);
RebalanceOptions rebalanceOptions = new RebalanceOptions();
rebalanceOptions.set_wait_secs(Integer.parseInt(waitTime));
client.rebalance(topologyInfo.get_name(), rebalanceOptions);
return getTopologyOpResponse(id, "rebalance");
}
/**
* putTopologyKill.
* @param client client
* @param id id
* @param waitTime waitTime
* @return putTopologyKill
* @throws TException TException
*/
public static Map<String, Object> putTopologyKill(Nimbus.Iface client, String id, String waitTime) throws TException {
GetInfoOptions getInfoOptions = new GetInfoOptions();
getInfoOptions.set_num_err_choice(NumErrorsChoice.NONE);
TopologyInfo topologyInfo = client.getTopologyInfoWithOpts(id, getInfoOptions);
KillOptions killOptions = new KillOptions();
killOptions.set_wait_secs(Integer.parseInt(waitTime));
client.killTopologyWithOpts(topologyInfo.get_name(), killOptions);
return getTopologyOpResponse(id, "kill");
}
/**
* setTopologyProfilingAction.
* @param client client
* @param id id
* @param hostPort hostPort
* @param timestamp timestamp
* @param config config
* @param profileAction profileAction
* @throws TException TException
*/
public static void setTopologyProfilingAction(
Nimbus.Iface client, String id,
String hostPort, Long timestamp, Map<String,
Object> config, ProfileAction profileAction) throws TException {
String host = hostPort.split(":")[0];
Set<Long> ports = new HashSet();
String port = hostPort.split(":")[1];
ports.add(Long.valueOf(port));
NodeInfo nodeInfo = new NodeInfo(host, ports);
ProfileRequest profileRequest = new ProfileRequest(nodeInfo, profileAction);
profileRequest.set_time_stamp(timestamp);
client.setWorkerProfiler(id, profileRequest);
}
/**
* getTopologyProfilingStart.
* @param client client
* @param id id
* @param hostPort hostPort
* @param timeout timeout
* @param config config
* @return getTopologyProfilingStart
* @throws TException TException
*/
public static Map<String, Object> getTopologyProfilingStart(Nimbus.Iface client, String id,
String hostPort, String timeout,
Map<String, Object> config) throws TException {
setTopologyProfilingAction(
client, id , hostPort, System.currentTimeMillis() + (Long.valueOf(timeout) * 60_000),
config, ProfileAction.JPROFILE_STOP);
Map<String, Object> result = new HashMap();
String host = hostPort.split(":")[0];
String port = hostPort.split(":")[1];
result.put("status", "ok");
result.put("id", hostPort);
result.put("timeout", timeout);
result.put("dumplink", getWorkerDumpLink(host, Long.valueOf(port), id, config));
return result;
}
/**
* getTopologyProfilingStop.
* @param client client
* @param id id
* @param hostPort hostPort
* @param config config
* @return getTopologyProfilingStop
* @throws TException TException
*/
public static Map<String, Object> getTopologyProfilingStop(Nimbus.Iface client, String id,
String hostPort,
Map<String, Object> config) throws TException {
setTopologyProfilingAction(client, id , hostPort, 0L, config, ProfileAction.JPROFILE_STOP);
Map<String, Object> result = new HashMap();
result.put("status", "ok");
result.put("id", hostPort);
return result;
}
/**
* getProfilingDisabled.
* @return getProfilingDisabled
*/
public static Map<String, Object> getProfilingDisabled() {
Map<String, Object> result = new HashMap();
result.put("status", "disabled");
result.put("message", "Profiling is not enabled on this server");
return result;
}
/**
* getTopologyProfilingDump.
* @param client client
* @param id id
* @param hostPort hostPort
* @param config config
* @return getTopologyProfilingDump
* @throws TException TException
*/
public static Map<String, Object> getTopologyProfilingDump(Nimbus.Iface client, String id, String hostPort,
Map<String,Object> config) throws TException {
setTopologyProfilingAction(
client, id , hostPort, System.currentTimeMillis(),
config, ProfileAction.JPROFILE_DUMP
);
Map<String, Object> result = new HashMap();
result.put("status", "ok");
result.put("id", hostPort);
return result;
}
public static Map<String, Object> getTopologyProfilingDumpJstack(Nimbus.Iface client, String id,
String hostPort, Map<String,
Object> config) throws TException {
setTopologyProfilingAction(
client, id , hostPort, System.currentTimeMillis(), config, ProfileAction.JSTACK_DUMP
);
Map<String, Object> result = new HashMap();
result.put("status", "ok");
result.put("id", hostPort);
return result;
}
/**
* getTopologyProfilingRestartWorker.
* @param client client
* @param id id
* @param hostPort hostPort
* @param config config
* @return getTopologyProfilingRestartWorker
* @throws TException TException
*/
public static Map<String, Object> getTopologyProfilingRestartWorker(Nimbus.Iface client,
String id, String hostPort,
Map<String,Object> config) throws TException {
setTopologyProfilingAction(
client, id , hostPort, System.currentTimeMillis(), config, ProfileAction.JVM_RESTART
);
Map<String, Object> result = new HashMap();
result.put("status", "ok");
result.put("id", hostPort);
return result;
}
/**
* getTopologyProfilingDumpHeap.
* @param client client
* @param id id
* @param hostPort hostport
* @param config config
* @return getTopologyProfilingDumpHeap
* @throws TException TException
*/
public static Map<String, Object> getTopologyProfilingDumpHeap(Nimbus.Iface client, String id, String hostPort,
Map<String,Object> config) throws TException {
setTopologyProfilingAction(client, id , hostPort, System.currentTimeMillis(), config, ProfileAction.JMAP_DUMP);
Map<String, Object> result = new HashMap();
result.put("status", "ok");
result.put("id", hostPort);
return result;
}
/**
* putTopologyLogLevel.
* @param client client
* @param namedLogLevel namedLogLevel
* @param id id
* @return putTopologyLogLevel.
* @throws TException TException
*/
public static Map<String, Object> putTopologyLogLevel(Nimbus.Iface client,
Map<String, Map> namedLogLevel, String id) throws TException {
Map<String, Map> namedLoggerlevels = namedLogLevel;
for (Map.Entry<String, Map> entry : namedLoggerlevels.entrySet()) {
String loggerNMame = entry.getKey();
String targetLevel = (String) entry.getValue().get("target_level");
Long timeout = (Long) entry.getValue().get("timeout");
LogLevel logLevel = new LogLevel();
if (targetLevel == null) {
logLevel.set_action(LogLevelAction.REMOVE);
logLevel.unset_target_log_level();
} else {
logLevel.set_action(LogLevelAction.UPDATE);
logLevel.set_target_log_level(org.apache.logging.log4j.Level.toLevel(targetLevel).name());
logLevel.set_reset_log_level_timeout_secs(Math.toIntExact(timeout));
}
LogConfig logConfig = new LogConfig();
logConfig.put_to_named_logger_level(loggerNMame, logLevel);
client.setLogConfig(id, logConfig);
}
return UIHelpers.getTopolgoyLogConfig(client.getLogConfig(id));
}
/**
* getNimbusSummary.
* @param clusterInfo clusterInfo
* @param config config
* @return getNimbusSummary
*/
public static Map<String, Object> getNimbusSummary(ClusterSummary clusterInfo, Map<String,Object> config) {
List<NimbusSummary> nimbusSummaries = clusterInfo.get_nimbuses();
List<String> nimbusSeeds = new ArrayList();
for (String nimbusHost : (List<String>) config.get(Config.NIMBUS_SEEDS)) {
if (!Utils.isLocalhostAddress(nimbusHost)) {
nimbusSeeds.add(nimbusHost + ":" + config.get(Config.NIMBUS_THRIFT_PORT));
}
}
List<Map> resultSummaryList = new ArrayList();
for (NimbusSummary nimbusSummary : nimbusSummaries) {
Map<String, Object> nimbusSummaryMap = new HashMap();
nimbusSummaryMap.put("host", nimbusSummary.get_host());
nimbusSummaryMap.put("port", nimbusSummary.get_port());
String status = "Not a Leader";
if (nimbusSummary.is_isLeader()) {
status = "Leader";
}
nimbusSummaryMap.put("status", status);
nimbusSummaryMap.put("version", nimbusSummary.get_version());
nimbusSummaryMap.put("nimbusUpTimeSeconds", nimbusSummary.get_uptime_secs());
nimbusSummaryMap.put("nimbusUpTime", prettyUptimeSec(nimbusSummary.get_uptime_secs()));
nimbusSummaryMap.put("nimbusLogLink", getNimbusLogLink(nimbusSummary.get_host(), config));
resultSummaryList.add(nimbusSummaryMap);
nimbusSeeds.remove(nimbusSummary.get_host() + ":" + String.valueOf(nimbusSummary.get_port()));
}
for (String nimbusSeed : nimbusSeeds) {
Map<String, Object> nimbusSummaryMap = new HashMap();
nimbusSummaryMap.put("host", nimbusSeed.split(":")[0]);
nimbusSummaryMap.put("port", nimbusSeed.split(":")[1]);
nimbusSummaryMap.put("status", "Offline");
nimbusSummaryMap.put("version", "Not applicable");
nimbusSummaryMap.put("nimbusUpTimeSeconds", "Not applicable");
nimbusSummaryMap.put("nimbusUpTime", "Not applicable");
nimbusSummaryMap.put("nimbusLogLink", getNimbusLogLink(nimbusSeed.split(":")[0], config));
resultSummaryList.add(nimbusSummaryMap);
}
Map<String, Object> result = new HashMap();
result.put("nimbuses", resultSummaryList);
return result;
}
}