add status command
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
new file mode 100644
index 0000000..a856f69
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
@@ -0,0 +1,351 @@
+package org.apache.s4.tools;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.s4.comm.topology.ClusterNode;
+import org.apache.s4.comm.topology.ZNRecord;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.tools.S4ArgsBase.GradleOptsConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.collect.Maps;
+
+public class Status extends S4ArgsBase {
+    static Logger logger = LoggerFactory.getLogger(Status.class);
+
+    private static String NONE = "--";
+
+    public static void main(String[] args) {
+
+        StatusArgs statusArgs = new StatusArgs();
+        Tools.parseArgs(statusArgs, args);
+
+        List<Cluster> clusterStatus = new ArrayList<Cluster>();
+        List<Stream> streamStatus = new ArrayList<Stream>();
+
+        try {
+            ZkClient zkClient = new ZkClient(statusArgs.zkConnectionString, statusArgs.timeout);
+            zkClient.setZkSerializer(new ZNRecordSerializer());
+
+            List<String> clusters = statusArgs.clusters;
+            if (clusters == null) {
+                // Load all subclusters
+                clusters = zkClient.getChildren("/s4/clusters");
+            }
+
+            Set<String> app = null;
+            Set<String> requiredAppCluster = new HashSet<String>();
+            if (statusArgs.apps != null) {
+                app = new HashSet<String>(statusArgs.apps);
+            }
+
+            for (String clusterName : clusters) {
+                try {
+                    if (zkClient.exists("/s4/clusters/" + clusterName)) {
+                        Cluster cluster = new Cluster(clusterName, zkClient);
+                        if (app == null || app.contains(cluster.app.name)) {
+                            clusterStatus.add(cluster);
+                            requiredAppCluster.add(cluster.clusterName);
+                        }
+                    } else {
+                        logger.error("/s4/clusters/" + clusterName + " doesn't exist");
+                    }
+                } catch (Exception e) {
+                    logger.error("Cannot get the status of " + clusterName, e);
+                }
+            }
+
+            List<String> streams = statusArgs.streams;
+            if (streams == null) {
+                // Load all streams published
+                streams = zkClient.getChildren("/s4/streams");
+            }
+
+            for (String streamName : streams) {
+                try {
+                    if (zkClient.exists("/s4/streams/" + streamName)) {
+                        Stream stream = new Stream(streamName, zkClient);
+                        if (app == null) {
+                            streamStatus.add(stream);
+                        } else {
+                            for (String cluster : requiredAppCluster) {
+                                if (stream.containsCluster(cluster)) {
+                                    streamStatus.add(stream);
+                                    break;
+                                }
+                            }
+                        }
+                    } else {
+                        logger.error("/s4/streams/" + streamName + " doesn't exist");
+                    }
+                } catch (Exception e) {
+                    logger.error("Cannot get the status of " + streamName, e);
+                }
+            }
+
+            System.out.println();
+            showAppsStatus(clusterStatus);
+            System.out.println("\n\n");
+            showClustersStatus(clusterStatus);
+            System.out.println("\n\n");
+            showStreamsStatus(streamStatus);
+            System.out.println("\n\n");
+
+        } catch (Exception e) {
+            logger.error("Cannot get the status of S4", e);
+        }
+
+    }
+
+    @Parameters(commandNames = "s4 status", commandDescription = "Show status of S4", separators = "=")
+    static class StatusArgs extends S4ArgsBase {
+
+        @Parameter(names = { "-app" }, description = "Only show status of specified S4 application(s)", required = false)
+        List<String> apps;
+
+        @Parameter(names = { "-c", "-cluster" }, description = "Only show status of specified S4 cluster(s)", required = false)
+        List<String> clusters;
+
+        @Parameter(names = { "-s", "-stream" }, description = "Only show status of specified published stream(s)", required = false)
+        List<String> streams;
+
+        @Parameter(names = "-zk", description = "ZooKeeper connection string")
+        String zkConnectionString = "localhost:2181";
+
+        @Parameter(names = "-timeout", description = "Connection timeout to Zookeeper, in ms")
+        int timeout = 10000;
+    }
+
+    private static void showAppsStatus(List<Cluster> clusters) {
+        System.out.println("App Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20), inMiddle("Cluster", 20), inMiddle("URI", 90));
+        System.out.println(generateEdge(130));
+        for (Cluster cluster : clusters) {
+            if (!NONE.equals(cluster.app.name)) {
+                System.out.format("%-20s%-20s%-90s%n", inMiddle(cluster.app.name, 20),
+                        inMiddle(cluster.app.cluster, 20), cluster.app.uri);
+            }
+        }
+        System.out.println(generateEdge(130));
+
+    }
+
+    private static void showClustersStatus(List<Cluster> clusters) {
+        System.out.println("Cluster Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-50s%-80s%n", " ", inMiddle("Active nodes", 80));
+        System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Name", 20), inMiddle("App", 20), inMiddle("Tasks", 10),
+                generateEdge(80));
+        System.out.format("%-50s%-10s%-10s%-50s%-10s%n", " ", inMiddle("Number", 8), inMiddle("Task id", 10),
+                inMiddle("Host", 50), inMiddle("Port", 8));
+        System.out.println(generateEdge(130));
+
+        for (Cluster cluster : clusters) {
+            System.out.format("%-20s%-20s%-10s%-10s", inMiddle(cluster.clusterName, 20),
+                    inMiddle(cluster.app.name, 20), inMiddle("" + cluster.taskNumber, 8),
+                    inMiddle("" + cluster.nodes.size(), 8));
+            boolean first = true;
+            for (ClusterNode node : cluster.nodes) {
+                if (first) {
+                    first = false;
+                } else {
+                    System.out.format("%n%-60s", " ");
+                }
+                System.out.format("%-10s%-50s%-10s", inMiddle("" + node.getTaskId(), 10),
+                        inMiddle(node.getMachineName(), 50), inMiddle(node.getPort() + "", 10));
+            }
+            System.out.println();
+        }
+        System.out.println(generateEdge(130));
+    }
+
+    private static void showStreamsStatus(List<Stream> streams) {
+        System.out.println("Stream Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-20s%-55s%-55s%n", inMiddle("Name", 20), inMiddle("Producers", 55),
+                inMiddle("Consumers", 55));
+        System.out.println(generateEdge(130));
+
+        for (Stream stream : streams) {
+            System.out.format("%-20s%-55s%-55s%n", inMiddle(stream.streamName, 20),
+                    inMiddle(getFormatString(stream.producers, stream.clusterAppMap), 55),
+                    inMiddle(getFormatString(stream.consumers, stream.clusterAppMap), 55));
+        }
+        System.out.println(generateEdge(130));
+
+    }
+
+    private static String inMiddle(String content, int width) {
+        int i = (width - content.length()) / 2;
+        return String.format("%" + i + "s%s", " ", content);
+    }
+
+    private static String generateEdge(int length) {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < length; i++) {
+            sb.append("-");
+        }
+        return sb.toString();
+    }
+
+    /**
+     * show as cluster1(app1), cluster2(app2)
+     *
+     * @param clusters
+     *            cluster list
+     * @param clusterAppMap
+     *            <cluster,app>
+     * @return
+     */
+    private static String getFormatString(Collection<String> clusters, Map<String, String> clusterAppMap) {
+        if (clusters == null || clusters.size() == 0) {
+            return NONE;
+        } else {
+            // show as: cluster1(app1), cluster2(app2)
+            StringBuilder sb = new StringBuilder();
+            for (String cluster : clusters) {
+                String app = clusterAppMap.get(cluster);
+                sb.append(cluster);
+                if (!NONE.equals(app)) {
+                    sb.append("(").append(app).append(")");
+                }
+                sb.append(" ");
+            }
+            return sb.toString();
+        }
+    }
+
+    static class Stream {
+
+        private final ZkClient zkClient;
+        private final String consumerPath;
+        private final String producerPath;
+
+        String streamName;
+        Set<String> producers = new HashSet<String>();// cluster name
+        Set<String> consumers = new HashSet<String>();// cluster name
+
+        Map<String, String> clusterAppMap = Maps.newHashMap();
+
+        public Stream(String streamName, ZkClient zkClient) throws Exception {
+            this.streamName = streamName;
+            this.zkClient = zkClient;
+            this.consumerPath = "/s4/streams/" + streamName + "/consumers";
+            this.producerPath = "/s4/streams/" + streamName + "/producers";
+            readStreamFromZk();
+        }
+
+        private void readStreamFromZk() throws Exception {
+            List<String> consumerNodes = zkClient.getChildren(consumerPath);
+            for (String node : consumerNodes) {
+                ZNRecord consumer = zkClient.readData(consumerPath + "/" + node, true);
+                consumers.add(consumer.getSimpleField("clusterName"));
+            }
+
+            List<String> producerNodes = zkClient.getChildren(producerPath);
+            for (String node : producerNodes) {
+                ZNRecord consumer = zkClient.readData(producerPath + "/" + node, true);
+                producers.add(consumer.getSimpleField("clusterName"));
+            }
+
+            getAppNames();
+        }
+
+        private void getAppNames() {
+            Set<String> clusters = new HashSet<String>(consumers);
+            clusters.addAll(producers);
+            for (String cluster : clusters) {
+                clusterAppMap.put(cluster, getApp(cluster, zkClient));
+            }
+        }
+
+        public boolean containsCluster(String cluster) {
+            if (producers.contains(cluster) || consumers.contains(cluster)) {
+                return true;
+            }
+            return false;
+        }
+
+        private static String getApp(String clusterName, ZkClient zkClient) {
+            String appPath = "/s4/clusters/" + clusterName + "/app/s4App";
+            if (zkClient.exists(appPath)) {
+                ZNRecord appRecord = zkClient.readData("/s4/clusters/" + clusterName + "/app/s4App");
+                return appRecord.getSimpleField("name");
+            }
+            return NONE;
+        }
+    }
+
+    static class App {
+        private String name = NONE;
+        private String cluster;
+        private String uri = NONE;
+    }
+
+    static class Cluster {
+        private final ZkClient zkClient;
+        private final String taskPath;
+        private final String processPath;
+        private final String appPath;
+
+        String clusterName;
+        int taskNumber;
+        App app;
+
+        List<ClusterNode> nodes = new ArrayList<ClusterNode>();
+
+        public Cluster(String clusterName, ZkClient zkClient) throws Exception {
+            this.clusterName = clusterName;
+            this.zkClient = zkClient;
+            this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
+            this.processPath = "/s4/clusters/" + clusterName + "/process";
+            this.appPath = "/s4/clusters/" + clusterName + "/app/s4App";
+            readClusterFromZk();
+        }
+
+        public void readClusterFromZk() throws Exception {
+            List<String> processes;
+            List<String> tasks;
+
+            tasks = zkClient.getChildren(taskPath);
+            processes = zkClient.getChildren(processPath);
+
+            taskNumber = tasks.size();
+
+            for (int i = 0; i < processes.size(); i++) {
+                ZNRecord process = zkClient.readData(processPath + "/" + processes.get(i), true);
+                if (process != null) {
+                    int partition = Integer.parseInt(process.getSimpleField("partition"));
+                    String host = process.getSimpleField("host");
+                    int port = Integer.parseInt(process.getSimpleField("port"));
+                    String taskId = process.getSimpleField("taskId");
+                    ClusterNode node = new ClusterNode(partition, port, host, taskId);
+                    nodes.add(node);
+                }
+            }
+
+            app = new App();
+            app.cluster = clusterName;
+            try {
+                ZNRecord appRecord = zkClient.readData(appPath);
+                app.name = appRecord.getSimpleField("name");
+                app.uri = appRecord.getSimpleField("s4r_uri");
+            } catch (ZkNoNodeException e) {
+                logger.warn(appPath + " doesn't exist");
+            }
+        }
+
+    }
+
+}
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
index 7ddf48a..8815a33 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -22,7 +22,7 @@
 
     enum Task {
         deploy(Deploy.class), node(Main.class), zkServer(ZKServer.class), newCluster(DefineCluster.class), adapter(null), newApp(
-                CreateApp.class), s4r(Package.class);
+                CreateApp.class), s4r(Package.class), status(Status.class);
 
         Class<?> target;