blob: f206802c406c648d3905b2d04639ddbda6222f8b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.utils;
import java.util.HashSet;
import java.util.Map;
import org.apache.storm.generated.ClusterSummary;
import org.apache.storm.generated.ExecutorStats;
import org.apache.storm.generated.ExecutorSummary;
import org.apache.storm.generated.GetInfoOptions;
import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.NumErrorsChoice;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.generated.TopologySummary;
public class Monitor {
private static final String WATCH_TRANSFERRED = "transferred";
private static final String WATCH_EMITTED = "emitted";
private int interval = 4;
private String topology;
private String component;
private String stream;
private String watch;
private HashSet<String> getComponents(Nimbus.Iface client, String topology) throws Exception {
HashSet<String> components = new HashSet<>();
ClusterSummary clusterSummary = client.getClusterInfo();
TopologySummary topologySummary = null;
for (TopologySummary ts : clusterSummary.get_topologies()) {
if (topology.equals(ts.get_name())) {
topologySummary = ts;
break;
}
}
if (topologySummary == null) {
throw new IllegalArgumentException("topology: " + topology + " not found");
} else {
String id = topologySummary.get_id();
GetInfoOptions getInfoOpts = new GetInfoOptions();
getInfoOpts.set_num_err_choice(NumErrorsChoice.NONE);
TopologyInfo info = client.getTopologyInfoWithOpts(id, getInfoOpts);
for (ExecutorSummary es : info.get_executors()) {
components.add(es.get_component_id());
}
}
return components;
}
public void metrics(Nimbus.Iface client) throws Exception {
if (interval <= 0) {
throw new IllegalArgumentException("poll interval must be positive");
}
if (topology == null || topology.isEmpty()) {
throw new IllegalArgumentException("topology name must be something");
}
if (component == null || component.isEmpty()) {
HashSet<String> components = getComponents(client, topology);
System.out.println("Available components for " + topology + " :");
System.out.println("------------------");
for (String comp : components) {
System.out.println(comp);
}
System.out.println("------------------");
System.out.println("Please use -m to specify one component");
return;
}
if (stream == null || stream.isEmpty()) {
throw new IllegalArgumentException("stream name must be something");
}
if (!WATCH_TRANSFERRED.equals(watch) && !WATCH_EMITTED.equals(watch)) {
throw new IllegalArgumentException("watch item must either be transferred or emitted");
}
System.out.println("topology\tcomponent\tparallelism\tstream\ttime-diff ms\t" + watch + "\tthroughput (Kt/s)");
long pollMs = interval * 1000;
long now = System.currentTimeMillis();
MetricsState state = new MetricsState(now, 0);
Poller poller = new Poller(now, pollMs);
do {
metrics(client, now, state);
try {
now = poller.nextPoll();
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
} while (true);
}
public void metrics(Nimbus.Iface client, long now, MetricsState state) throws Exception {
long totalStatted = 0;
int componentParallelism = 0;
boolean streamFound = false;
ClusterSummary clusterSummary = client.getClusterInfo();
TopologySummary topologySummary = null;
for (TopologySummary ts : clusterSummary.get_topologies()) {
if (topology.equals(ts.get_name())) {
topologySummary = ts;
break;
}
}
if (topologySummary == null) {
throw new IllegalArgumentException("topology: " + topology + " not found");
} else {
String id = topologySummary.get_id();
GetInfoOptions getInfoOpts = new GetInfoOptions();
getInfoOpts.set_num_err_choice(NumErrorsChoice.NONE);
TopologyInfo info = client.getTopologyInfoWithOpts(id, getInfoOpts);
for (ExecutorSummary es : info.get_executors()) {
if (component.equals(es.get_component_id())) {
componentParallelism++;
ExecutorStats stats = es.get_stats();
if (stats != null) {
Map<String, Map<String, Long>> statted =
WATCH_EMITTED.equals(watch) ? stats.get_emitted() : stats.get_transferred();
if (statted != null) {
Map<String, Long> e2 = statted.get(":all-time");
if (e2 != null) {
Long stream = e2.get(this.stream);
if (stream != null) {
streamFound = true;
totalStatted += stream;
}
}
}
}
}
}
}
if (componentParallelism <= 0) {
HashSet<String> components = getComponents(client, topology);
System.out.println("Available components for " + topology + " :");
System.out.println("------------------");
for (String comp : components) {
System.out.println(comp);
}
System.out.println("------------------");
throw new IllegalArgumentException("component: " + component + " not found");
}
if (!streamFound) {
throw new IllegalArgumentException("stream: " + stream + " not found");
}
long timeDelta = now - state.getLastTime();
long stattedDelta = totalStatted - state.getLastStatted();
state.setLastTime(now);
state.setLastStatted(totalStatted);
double throughput = (stattedDelta == 0 || timeDelta == 0) ? 0.0 : ((double) stattedDelta / (double) timeDelta);
System.out.println(topology + "\t"
+ component + "\t"
+ componentParallelism + "\t"
+ stream + "\t"
+ timeDelta + "\t"
+ stattedDelta + "\t"
+ throughput);
}
public void setInterval(int interval) {
this.interval = interval;
}
public void setTopology(String topology) {
this.topology = topology;
}
public void setComponent(String component) {
this.component = component;
}
public void setStream(String stream) {
this.stream = stream;
}
public void setWatch(String watch) {
this.watch = watch;
}
private static class MetricsState {
private long lastTime = 0;
private long lastStatted = 0;
private MetricsState(long lastTime, long lastStatted) {
this.lastTime = lastTime;
this.lastStatted = lastStatted;
}
public long getLastStatted() {
return lastStatted;
}
public void setLastStatted(long lastStatted) {
this.lastStatted = lastStatted;
}
public long getLastTime() {
return lastTime;
}
public void setLastTime(long lastTime) {
this.lastTime = lastTime;
}
}
private static class Poller {
private long startTime = 0;
private long pollMs = 0;
private Poller(long startTime, long pollMs) {
this.startTime = startTime;
this.pollMs = pollMs;
}
public long nextPoll() throws InterruptedException {
long now = System.currentTimeMillis();
long cycle = (now - startTime) / pollMs;
long wakeupTime = startTime + (pollMs * (cycle + 1));
long sleepTime = wakeupTime - now;
if (sleepTime > 0) {
Thread.sleep(sleepTime);
}
now = System.currentTimeMillis();
return now;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public long getPollMs() {
return pollMs;
}
public void setPollMs(long pollMs) {
this.pollMs = pollMs;
}
}
}