add status command
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
new file mode 100644
index 0000000..a856f69
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
@@ -0,0 +1,351 @@
+package org.apache.s4.tools;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.s4.comm.topology.ClusterNode;
+import org.apache.s4.comm.topology.ZNRecord;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.tools.S4ArgsBase.GradleOptsConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.collect.Maps;
+
+public class Status extends S4ArgsBase {
+ static Logger logger = LoggerFactory.getLogger(Status.class);
+
+ private static String NONE = "--";
+
+ public static void main(String[] args) {
+
+ StatusArgs statusArgs = new StatusArgs();
+ Tools.parseArgs(statusArgs, args);
+
+ List<Cluster> clusterStatus = new ArrayList<Cluster>();
+ List<Stream> streamStatus = new ArrayList<Stream>();
+
+ try {
+ ZkClient zkClient = new ZkClient(statusArgs.zkConnectionString, statusArgs.timeout);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ List<String> clusters = statusArgs.clusters;
+ if (clusters == null) {
+ // Load all subclusters
+ clusters = zkClient.getChildren("/s4/clusters");
+ }
+
+ Set<String> app = null;
+ Set<String> requiredAppCluster = new HashSet<String>();
+ if (statusArgs.apps != null) {
+ app = new HashSet<String>(statusArgs.apps);
+ }
+
+ for (String clusterName : clusters) {
+ try {
+ if (zkClient.exists("/s4/clusters/" + clusterName)) {
+ Cluster cluster = new Cluster(clusterName, zkClient);
+ if (app == null || app.contains(cluster.app.name)) {
+ clusterStatus.add(cluster);
+ requiredAppCluster.add(cluster.clusterName);
+ }
+ } else {
+ logger.error("/s4/clusters/" + clusterName + " doesn't exist");
+ }
+ } catch (Exception e) {
+ logger.error("Cannot get the status of " + clusterName, e);
+ }
+ }
+
+ List<String> streams = statusArgs.streams;
+ if (streams == null) {
+ // Load all streams published
+ streams = zkClient.getChildren("/s4/streams");
+ }
+
+ for (String streamName : streams) {
+ try {
+ if (zkClient.exists("/s4/streams/" + streamName)) {
+ Stream stream = new Stream(streamName, zkClient);
+ if (app == null) {
+ streamStatus.add(stream);
+ } else {
+ for (String cluster : requiredAppCluster) {
+ if (stream.containsCluster(cluster)) {
+ streamStatus.add(stream);
+ break;
+ }
+ }
+ }
+ } else {
+ logger.error("/s4/streams/" + streamName + " doesn't exist");
+ }
+ } catch (Exception e) {
+ logger.error("Cannot get the status of " + streamName, e);
+ }
+ }
+
+ System.out.println();
+ showAppsStatus(clusterStatus);
+ System.out.println("\n\n");
+ showClustersStatus(clusterStatus);
+ System.out.println("\n\n");
+ showStreamsStatus(streamStatus);
+ System.out.println("\n\n");
+
+ } catch (Exception e) {
+ logger.error("Cannot get the status of S4", e);
+ }
+
+ }
+
+ @Parameters(commandNames = "s4 status", commandDescription = "Show status of S4", separators = "=")
+ static class StatusArgs extends S4ArgsBase {
+
+ @Parameter(names = { "-app" }, description = "Only show status of specified S4 application(s)", required = false)
+ List<String> apps;
+
+ @Parameter(names = { "-c", "-cluster" }, description = "Only show status of specified S4 cluster(s)", required = false)
+ List<String> clusters;
+
+ @Parameter(names = { "-s", "-stream" }, description = "Only show status of specified published stream(s)", required = false)
+ List<String> streams;
+
+ @Parameter(names = "-zk", description = "ZooKeeper connection string")
+ String zkConnectionString = "localhost:2181";
+
+ @Parameter(names = "-timeout", description = "Connection timeout to Zookeeper, in ms")
+ int timeout = 10000;
+ }
+
+ private static void showAppsStatus(List<Cluster> clusters) {
+ System.out.println("App Status");
+ System.out.println(generateEdge(130));
+ System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20), inMiddle("Cluster", 20), inMiddle("URI", 90));
+ System.out.println(generateEdge(130));
+ for (Cluster cluster : clusters) {
+ if (!NONE.equals(cluster.app.name)) {
+ System.out.format("%-20s%-20s%-90s%n", inMiddle(cluster.app.name, 20),
+ inMiddle(cluster.app.cluster, 20), cluster.app.uri);
+ }
+ }
+ System.out.println(generateEdge(130));
+
+ }
+
+ private static void showClustersStatus(List<Cluster> clusters) {
+ System.out.println("Cluster Status");
+ System.out.println(generateEdge(130));
+ System.out.format("%-50s%-80s%n", " ", inMiddle("Active nodes", 80));
+ System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Name", 20), inMiddle("App", 20), inMiddle("Tasks", 10),
+ generateEdge(80));
+ System.out.format("%-50s%-10s%-10s%-50s%-10s%n", " ", inMiddle("Number", 8), inMiddle("Task id", 10),
+ inMiddle("Host", 50), inMiddle("Port", 8));
+ System.out.println(generateEdge(130));
+
+ for (Cluster cluster : clusters) {
+ System.out.format("%-20s%-20s%-10s%-10s", inMiddle(cluster.clusterName, 20),
+ inMiddle(cluster.app.name, 20), inMiddle("" + cluster.taskNumber, 8),
+ inMiddle("" + cluster.nodes.size(), 8));
+ boolean first = true;
+ for (ClusterNode node : cluster.nodes) {
+ if (first) {
+ first = false;
+ } else {
+ System.out.format("%n%-60s", " ");
+ }
+ System.out.format("%-10s%-50s%-10s", inMiddle("" + node.getTaskId(), 10),
+ inMiddle(node.getMachineName(), 50), inMiddle(node.getPort() + "", 10));
+ }
+ System.out.println();
+ }
+ System.out.println(generateEdge(130));
+ }
+
+ private static void showStreamsStatus(List<Stream> streams) {
+ System.out.println("Stream Status");
+ System.out.println(generateEdge(130));
+ System.out.format("%-20s%-55s%-55s%n", inMiddle("Name", 20), inMiddle("Producers", 55),
+ inMiddle("Consumers", 55));
+ System.out.println(generateEdge(130));
+
+ for (Stream stream : streams) {
+ System.out.format("%-20s%-55s%-55s%n", inMiddle(stream.streamName, 20),
+ inMiddle(getFormatString(stream.producers, stream.clusterAppMap), 55),
+ inMiddle(getFormatString(stream.consumers, stream.clusterAppMap), 55));
+ }
+ System.out.println(generateEdge(130));
+
+ }
+
+ private static String inMiddle(String content, int width) {
+ int i = (width - content.length()) / 2;
+ return String.format("%" + i + "s%s", " ", content);
+ }
+
+ private static String generateEdge(int length) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < length; i++) {
+ sb.append("-");
+ }
+ return sb.toString();
+ }
+
+ /**
+ * show as cluster1(app1), cluster2(app2)
+ *
+ * @param clusters
+ * cluster list
+ * @param clusterAppMap
+ * <cluster,app>
+ * @return
+ */
+ private static String getFormatString(Collection<String> clusters, Map<String, String> clusterAppMap) {
+ if (clusters == null || clusters.size() == 0) {
+ return NONE;
+ } else {
+ // show as: cluster1(app1), cluster2(app2)
+ StringBuilder sb = new StringBuilder();
+ for (String cluster : clusters) {
+ String app = clusterAppMap.get(cluster);
+ sb.append(cluster);
+ if (!NONE.equals(app)) {
+ sb.append("(").append(app).append(")");
+ }
+ sb.append(" ");
+ }
+ return sb.toString();
+ }
+ }
+
+ static class Stream {
+
+ private final ZkClient zkClient;
+ private final String consumerPath;
+ private final String producerPath;
+
+ String streamName;
+ Set<String> producers = new HashSet<String>();// cluster name
+ Set<String> consumers = new HashSet<String>();// cluster name
+
+ Map<String, String> clusterAppMap = Maps.newHashMap();
+
+ public Stream(String streamName, ZkClient zkClient) throws Exception {
+ this.streamName = streamName;
+ this.zkClient = zkClient;
+ this.consumerPath = "/s4/streams/" + streamName + "/consumers";
+ this.producerPath = "/s4/streams/" + streamName + "/producers";
+ readStreamFromZk();
+ }
+
+ private void readStreamFromZk() throws Exception {
+ List<String> consumerNodes = zkClient.getChildren(consumerPath);
+ for (String node : consumerNodes) {
+ ZNRecord consumer = zkClient.readData(consumerPath + "/" + node, true);
+ consumers.add(consumer.getSimpleField("clusterName"));
+ }
+
+ List<String> producerNodes = zkClient.getChildren(producerPath);
+ for (String node : producerNodes) {
+ ZNRecord consumer = zkClient.readData(producerPath + "/" + node, true);
+ producers.add(consumer.getSimpleField("clusterName"));
+ }
+
+ getAppNames();
+ }
+
+ private void getAppNames() {
+ Set<String> clusters = new HashSet<String>(consumers);
+ clusters.addAll(producers);
+ for (String cluster : clusters) {
+ clusterAppMap.put(cluster, getApp(cluster, zkClient));
+ }
+ }
+
+ public boolean containsCluster(String cluster) {
+ if (producers.contains(cluster) || consumers.contains(cluster)) {
+ return true;
+ }
+ return false;
+ }
+
+ private static String getApp(String clusterName, ZkClient zkClient) {
+ String appPath = "/s4/clusters/" + clusterName + "/app/s4App";
+ if (zkClient.exists(appPath)) {
+ ZNRecord appRecord = zkClient.readData("/s4/clusters/" + clusterName + "/app/s4App");
+ return appRecord.getSimpleField("name");
+ }
+ return NONE;
+ }
+ }
+
+ static class App {
+ private String name = NONE;
+ private String cluster;
+ private String uri = NONE;
+ }
+
+ static class Cluster {
+ private final ZkClient zkClient;
+ private final String taskPath;
+ private final String processPath;
+ private final String appPath;
+
+ String clusterName;
+ int taskNumber;
+ App app;
+
+ List<ClusterNode> nodes = new ArrayList<ClusterNode>();
+
+ public Cluster(String clusterName, ZkClient zkClient) throws Exception {
+ this.clusterName = clusterName;
+ this.zkClient = zkClient;
+ this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
+ this.processPath = "/s4/clusters/" + clusterName + "/process";
+ this.appPath = "/s4/clusters/" + clusterName + "/app/s4App";
+ readClusterFromZk();
+ }
+
+ public void readClusterFromZk() throws Exception {
+ List<String> processes;
+ List<String> tasks;
+
+ tasks = zkClient.getChildren(taskPath);
+ processes = zkClient.getChildren(processPath);
+
+ taskNumber = tasks.size();
+
+ for (int i = 0; i < processes.size(); i++) {
+ ZNRecord process = zkClient.readData(processPath + "/" + processes.get(i), true);
+ if (process != null) {
+ int partition = Integer.parseInt(process.getSimpleField("partition"));
+ String host = process.getSimpleField("host");
+ int port = Integer.parseInt(process.getSimpleField("port"));
+ String taskId = process.getSimpleField("taskId");
+ ClusterNode node = new ClusterNode(partition, port, host, taskId);
+ nodes.add(node);
+ }
+ }
+
+ app = new App();
+ app.cluster = clusterName;
+ try {
+ ZNRecord appRecord = zkClient.readData(appPath);
+ app.name = appRecord.getSimpleField("name");
+ app.uri = appRecord.getSimpleField("s4r_uri");
+ } catch (ZkNoNodeException e) {
+ logger.warn(appPath + " doesn't exist");
+ }
+ }
+
+ }
+
+}
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
index 7ddf48a..8815a33 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -22,7 +22,7 @@
enum Task {
deploy(Deploy.class), node(Main.class), zkServer(ZKServer.class), newCluster(DefineCluster.class), adapter(null), newApp(
- CreateApp.class), s4r(Package.class);
+ CreateApp.class), s4r(Package.class), status(Status.class);
Class<?> target;