blob: 02e223e7e26c78c40d6b3038575deb68aee4a0fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package org.apache.storm.perf.utils;
import java.util.List;
import java.util.Map;
import org.apache.storm.generated.ClusterSummary;
import org.apache.storm.generated.ExecutorSpecificStats;
import org.apache.storm.generated.ExecutorStats;
import org.apache.storm.generated.ExecutorSummary;
import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.SpoutStats;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.generated.TopologySummary;
import org.apache.storm.utils.Utils;
public class MetricsSample {
private long sampleTime = -1L;
private long totalTransferred = 0L;
private long totalEmitted = 0L;
private long totalAcked = 0L;
private long totalFailed = 0L;
private double totalLatency;
private long spoutEmitted = 0L;
private long spoutTransferred = 0L;
private int spoutExecutors = 0;
private int numSupervisors = 0;
private int numWorkers = 0;
private int numTasks = 0;
private int numExecutors = 0;
private int totalSlots = 0;
private int usedSlots = 0;
public static MetricsSample factory(Nimbus.Iface client, String topologyName) throws Exception {
// "************ Sampling Metrics *****************
ClusterSummary clusterSummary = client.getClusterInfo();
// get topology info
TopologySummary topSummary = getTopologySummary(clusterSummary, topologyName);
int topologyExecutors = topSummary.get_num_executors();
int topologyWorkers = topSummary.get_num_workers();
int topologyTasks = topSummary.get_num_tasks();
TopologyInfo topInfo = client.getTopologyInfo(topSummary.get_id());
MetricsSample sample = getMetricsSample(topInfo);
sample.numWorkers = topologyWorkers;
sample.numExecutors = topologyExecutors;
sample.numTasks = topologyTasks;
return sample;
}
private static MetricsSample getMetricsSample(TopologyInfo topInfo) {
List<ExecutorSummary> executorSummaries = topInfo.get_executors();
// totals
long totalTransferred = 0L;
long totalEmitted = 0L;
long totalAcked = 0L;
long totalFailed = 0L;
// number of spout executors
int spoutExecCount = 0;
double spoutLatencySum = 0.0;
long spoutTransferred = 0L;
// Executor summaries
for (ExecutorSummary executorSummary : executorSummaries) {
ExecutorStats executorStats = executorSummary.get_stats();
if (executorStats == null) {
continue;
}
ExecutorSpecificStats executorSpecificStats = executorStats.get_specific();
if (executorSpecificStats == null) {
// bail out
continue;
}
// transferred totals
Map<String, Map<String, Long>> transferred = executorStats.get_transferred();
Map<String, Long> txMap = transferred.get(":all-time");
if (txMap == null) {
continue;
}
for (String key : txMap.keySet()) {
// todo, ignore the master batch coordinator ?
if (!Utils.isSystemId(key)) {
Long count = txMap.get(key);
totalTransferred += count;
if (executorSpecificStats.is_set_spout()) {
spoutTransferred += count;
}
}
}
// we found a spout
if (executorSpecificStats.isSet(2)) { // spout
SpoutStats spoutStats = executorSpecificStats.get_spout();
Map<String, Long> acked = spoutStats.get_acked().get(":all-time");
if (acked != null) {
for (String key : acked.keySet()) {
totalAcked += acked.get(key);
}
}
Map<String, Long> failed = spoutStats.get_failed().get(":all-time");
if (failed != null) {
for (String key : failed.keySet()) {
totalFailed += failed.get(key);
}
}
Double total = 0d;
Map<String, Double> vals = spoutStats.get_complete_ms_avg().get(":all-time");
if (vals != null) {
for (String key : vals.keySet()) {
total += vals.get(key);
}
Double latency = total / vals.size();
spoutLatencySum += latency;
}
spoutExecCount++;
}
} // end executor summary
MetricsSample ret = new MetricsSample();
ret.totalEmitted = totalEmitted;
ret.totalTransferred = totalTransferred;
ret.totalAcked = totalAcked;
ret.totalFailed = totalFailed;
ret.totalLatency = spoutLatencySum / spoutExecCount;
long spoutEmitted = 0L;
ret.spoutEmitted = spoutEmitted;
ret.spoutTransferred = spoutTransferred;
ret.sampleTime = System.currentTimeMillis();
// ret.numSupervisors = clusterSummary.get_supervisors_size();
ret.numWorkers = 0;
ret.numExecutors = 0;
ret.numTasks = 0;
ret.spoutExecutors = spoutExecCount;
return ret;
}
public static TopologySummary getTopologySummary(ClusterSummary cs, String name) {
for (TopologySummary ts : cs.get_topologies()) {
if (name.equals(ts.get_name())) {
return ts;
}
}
return null;
}
// getters
public long getSampleTime() {
return sampleTime;
}
public long getTotalTransferred() {
return totalTransferred;
}
public long getTotalEmitted() {
return totalEmitted;
}
public long getTotalAcked() {
return totalAcked;
}
public long getTotalFailed() {
return totalFailed;
}
public double getTotalLatency() {
return totalLatency;
}
public long getSpoutEmitted() {
return spoutEmitted;
}
public long getSpoutTransferred() {
return spoutTransferred;
}
public int getNumSupervisors() {
return numSupervisors;
}
public int getNumWorkers() {
return numWorkers;
}
public int getNumTasks() {
return numTasks;
}
public int getTotalSlots() {
return totalSlots;
}
public int getSpoutExecutors() {
return this.spoutExecutors;
}
public int getNumExecutors() {
return this.numExecutors;
}
public int getUsedSlots() {
return this.usedSlots;
}
}