blob: bd4224eb5559f68e998959da6693abb7f8f0cc8c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.s4.tools;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.ConfigScope;
import org.apache.helix.ConfigScopeBuilder;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.s4.comm.topology.ClusterNode;
import org.apache.s4.comm.topology.ZNRecord;
import org.apache.s4.comm.topology.ZNRecordSerializer;
import org.apache.s4.comm.topology.ZkClient;
import org.apache.s4.tools.S4ArgsBase.GradleOptsConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.common.collect.Maps;
public class S4Status extends S4ArgsBase
{
static Logger logger = LoggerFactory.getLogger(S4Status.class);
private static String NONE = "--";
public static void main(String[] args)
{
StatusArgs statusArgs = new StatusArgs();
Tools.parseArgs(statusArgs, args);
try
{
if (statusArgs.clusters.size() > 0)
{
for (String cluster : statusArgs.clusters)
{
HelixManager manager = HelixManagerFactory.getZKHelixManager(cluster,
"ADMIN", InstanceType.ADMINISTRATOR,
statusArgs.zkConnectionString);
manager.connect();
ConfigAccessor configAccessor = manager.getConfigAccessor();
ConfigScopeBuilder builder = new ConfigScopeBuilder();
printClusterInfo(manager, cluster);
List<String> resourcesInCluster = manager.getClusterManagmentTool()
.getResourcesInCluster(cluster);
List<String> apps = new ArrayList<String>();
List<String> tasks = new ArrayList<String>();
for (String resource : resourcesInCluster)
{
ConfigScope scope = builder.forCluster(cluster)
.forResource(resource).build();
String resourceType = configAccessor.get(scope, "type");
if ("App".equals(resourceType))
{
apps.add(resource);
} else if ("Task".equals(resourceType))
{
tasks.add(resource);
}
}
if (statusArgs.apps == null && statusArgs.streams == null)
{
statusArgs.apps = apps;
statusArgs.streams = tasks;
}
for (String app : statusArgs.apps)
{
if (resourcesInCluster.contains(app))
{
printAppInfo(manager, cluster, app);
}
}
if (statusArgs.streams != null && statusArgs.streams.size() > 0)
{
for (String stream : statusArgs.streams)
{
if (resourcesInCluster.contains(stream))
{
printStreamInfo(manager, cluster, stream);
}
}
}
manager.disconnect();
}
}
} catch (Exception e)
{
logger.error("Cannot get the status of S4", e);
}
}
private static void printStreamInfo(HelixManager manager, String cluster,
String taskId)
{
ConfigAccessor configAccessor = manager.getConfigAccessor();
ConfigScopeBuilder builder = new ConfigScopeBuilder();
ConfigScope scope = builder.forCluster(cluster).forResource(taskId).build();
String streamName = configAccessor.get(scope, "streamName");
String taskType = configAccessor.get(scope, "taskType");
System.out.println("Task Status");
System.out.println(generateEdge(130));
System.out.format("%-20s%-20s%-90s%n", inMiddle("Task Id", 20),
inMiddle("Cluster", 20), inMiddle("Description", 90));
System.out.println(generateEdge(130));
System.out.format("%-20s%-20s%-90s%n", inMiddle(taskId, 20),
inMiddle(cluster, 20), inMiddle(streamName + " " + taskType, 90));
System.out.println(generateEdge(130));
HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
Builder keyBuilder = helixDataAccessor.keyBuilder();
IdealState assignment = helixDataAccessor.getProperty(keyBuilder
.idealStates(taskId));
ExternalView view = helixDataAccessor.getProperty(keyBuilder
.externalView(taskId));
List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder
.liveInstances());
System.out.format("%-50s%-100s%n", inMiddle("Partition", 50),
inMiddle("State", 20));
System.out.println(generateEdge(130));
for (String partition : assignment.getPartitionSet())
{
Map<String, String> stateMap = view.getStateMap(partition);
StringBuilder sb = new StringBuilder();
String delim="";
for (String instance : stateMap.keySet())
{
sb.append(delim);
String state = stateMap.get(instance);
if (liveInstances.contains(instance))
{
sb.append(instance).append(":").append(state);
} else
{
sb.append(instance).append(":").append("OFFLINE");
}
delim=", ";
}
System.out.format("%-50s%-10s%n", inMiddle(partition, 50),
inMiddle(sb.toString(), 100));
}
System.out.println(generateEdge(130));
}
private static void printAppInfo(HelixManager manager, String cluster,
String app)
{
ConfigAccessor configAccessor = manager.getConfigAccessor();
ConfigScopeBuilder builder = new ConfigScopeBuilder();
ConfigScope scope = builder.forCluster(cluster).forResource(app).build();
String uri = configAccessor.get(scope, "s4r_uri");
System.out.println("App Status");
System.out.println(generateEdge(130));
System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20),
inMiddle("Cluster", 20), inMiddle("URI", 90));
System.out.println(generateEdge(130));
System.out.format("%-20s%-20s%-90s%n", inMiddle(app, 20),
inMiddle(cluster, 20), inMiddle(uri, 90));
System.out.println(generateEdge(130));
HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
Builder keyBuilder = helixDataAccessor.keyBuilder();
IdealState assignment = helixDataAccessor.getProperty(keyBuilder
.idealStates(app));
ExternalView view = helixDataAccessor.getProperty(keyBuilder
.externalView(app));
List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder
.liveInstances());
Map<String, String> assignmentMap = assignment.getInstanceStateMap(app);
Map<String, String> appStateMap = view.getStateMap(app);
System.out.format("%-50s%-20s%n", inMiddle("Node id", 50),
inMiddle("DEPLOYED", 20));
System.out.println(generateEdge(130));
for (String instance : assignmentMap.keySet())
{
String state = appStateMap.get(instance);
System.out
.format(
"%-50s%-10s%n",
inMiddle(instance, 50),
inMiddle((("ONLINE".equals(state) && liveInstances
.contains(instance)) ? "Y" : "N"), 20));
}
System.out.println(generateEdge(130));
}
private static void printClusterInfo(HelixManager manager, String cluster)
{
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
Builder keyBuilder = dataAccessor.keyBuilder();
List<String> instances = dataAccessor.getChildNames(keyBuilder
.instanceConfigs());
List<String> liveInstances = dataAccessor.getChildNames(keyBuilder
.liveInstances());
if (liveInstances == null)
{
liveInstances = Collections.emptyList();
}
System.out.println("Cluster Status");
System.out.println(generateEdge(130));
System.out.format("%-50s%-80s%n", " ", inMiddle("Nodes", 80));
System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Cluster Name", 20),
inMiddle("Nodes", 20), inMiddle("Active", 10), generateEdge(80));
System.out.format("%-54s%-10s%-50s%-8s%-8s%n", " ",
inMiddle("Node id", 10), inMiddle("Host", 50), inMiddle("Port", 8),
inMiddle("Active", 10));
System.out.println(generateEdge(130));
System.out.format("%-20s%-20s%-10s", inMiddle(cluster, 20),
inMiddle("" + instances.size(), 8),
inMiddle("" + liveInstances.size(), 8));
boolean first = true;
for (String instance : instances)
{
InstanceConfig config = dataAccessor.getProperty(keyBuilder
.instanceConfig(instance));
// System.out.println(config);
if (first)
{
first = false;
} else
{
System.out.format("%n%-50s", " ");
}
System.out
.format(
"%-10s%-46s%-10s%-10s",
inMiddle("" + config.getId(), 10),
inMiddle(config.getHostName(), 50),
inMiddle(config.getPort() + "", 10),
inMiddle(liveInstances.contains(config.getInstanceName()) ? "Y"
: "N", 10));
}
System.out.println();
}
@Parameters(commandNames = "s4 status", commandDescription = "Show status of S4", separators = "=")
static class StatusArgs extends S4ArgsBase
{
@Parameter(names = { "-app" }, description = "Only show status of specified S4 application(s)", required = false)
List<String> apps;
@Parameter(names = { "-c", "-cluster" }, description = "Only show status of specified S4 cluster(s)", required = true)
List<String> clusters;
@Parameter(names = { "-s", "-stream" }, description = "Only show status of specified published stream(s)", required = false)
List<String> streams;
@Parameter(names = "-zk", description = "ZooKeeper connection string")
String zkConnectionString = "localhost:2181";
@Parameter(names = "-timeout", description = "Connection timeout to Zookeeper, in ms")
int timeout = 10000;
}
private static void showAppsStatus(List<Cluster> clusters)
{
System.out.println("App Status");
System.out.println(generateEdge(130));
System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20),
inMiddle("Cluster", 20), inMiddle("URI", 90));
System.out.println(generateEdge(130));
for (Cluster cluster : clusters)
{
if (!NONE.equals(cluster.app.name))
{
System.out.format("%-20s%-20s%-90s%n", inMiddle(cluster.app.name, 20),
inMiddle(cluster.app.cluster, 20), cluster.app.uri);
}
}
System.out.println(generateEdge(130));
}
private static void showClustersStatus(List<Cluster> clusters)
{
System.out.println("Cluster Status");
System.out.println(generateEdge(130));
System.out.format("%-50s%-80s%n", " ", inMiddle("Active nodes", 80));
System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Name", 20),
inMiddle("App", 20), inMiddle("Tasks", 10), generateEdge(80));
System.out.format("%-50s%-10s%-10s%-50s%-10s%n", " ",
inMiddle("Number", 8), inMiddle("Task id", 10), inMiddle("Host", 50),
inMiddle("Port", 8));
System.out.println(generateEdge(130));
for (Cluster cluster : clusters)
{
System.out.format("%-20s%-20s%-10s%-10s",
inMiddle(cluster.clusterName, 20), inMiddle(cluster.app.name, 20),
inMiddle("" + cluster.taskNumber, 8),
inMiddle("" + cluster.nodes.size(), 8));
boolean first = true;
for (ClusterNode node : cluster.nodes)
{
if (first)
{
first = false;
} else
{
System.out.format("%n%-60s", " ");
}
System.out.format("%-10s%-50s%-10s",
inMiddle("" + node.getTaskId(), 10),
inMiddle(node.getMachineName(), 50),
inMiddle(node.getPort() + "", 10));
}
System.out.println();
}
System.out.println(generateEdge(130));
}
private static void showStreamsStatus(List<Stream> streams)
{
System.out.println("Stream Status");
System.out.println(generateEdge(130));
System.out.format("%-20s%-55s%-55s%n", inMiddle("Name", 20),
inMiddle("Producers", 55), inMiddle("Consumers", 55));
System.out.println(generateEdge(130));
for (Stream stream : streams)
{
System.out
.format(
"%-20s%-55s%-55s%n",
inMiddle(stream.streamName, 20),
inMiddle(getFormatString(stream.producers, stream.clusterAppMap),
55),
inMiddle(getFormatString(stream.consumers, stream.clusterAppMap),
55));
}
System.out.println(generateEdge(130));
}
private static String inMiddle(String content, int width)
{
int i = (width - content.length()) / 2;
return String.format("%" + i + "s%s", " ", content);
}
private static String generateEdge(int length)
{
StringBuilder sb = new StringBuilder();
for (int i = 0; i < length; i++)
{
sb.append("-");
}
return sb.toString();
}
/**
* show as cluster1(app1), cluster2(app2)
*
* @param clusters
* cluster list
* @param clusterAppMap
* <cluster,app>
* @return
*/
private static String getFormatString(Collection<String> clusters,
Map<String, String> clusterAppMap)
{
if (clusters == null || clusters.size() == 0)
{
return NONE;
} else
{
// show as: cluster1(app1), cluster2(app2)
StringBuilder sb = new StringBuilder();
for (String cluster : clusters)
{
String app = clusterAppMap.get(cluster);
sb.append(cluster);
if (!NONE.equals(app))
{
sb.append("(").append(app).append(")");
}
sb.append(" ");
}
return sb.toString();
}
}
static class Stream
{
private final ZkClient zkClient;
private final String consumerPath;
private final String producerPath;
String streamName;
Set<String> producers = new HashSet<String>();// cluster name
Set<String> consumers = new HashSet<String>();// cluster name
Map<String, String> clusterAppMap = Maps.newHashMap();
public Stream(String streamName, ZkClient zkClient) throws Exception
{
this.streamName = streamName;
this.zkClient = zkClient;
this.consumerPath = "/s4/streams/" + streamName + "/consumers";
this.producerPath = "/s4/streams/" + streamName + "/producers";
readStreamFromZk();
}
private void readStreamFromZk() throws Exception
{
List<String> consumerNodes = zkClient.getChildren(consumerPath);
for (String node : consumerNodes)
{
ZNRecord consumer = zkClient.readData(consumerPath + "/" + node, true);
consumers.add(consumer.getSimpleField("clusterName"));
}
List<String> producerNodes = zkClient.getChildren(producerPath);
for (String node : producerNodes)
{
ZNRecord consumer = zkClient.readData(producerPath + "/" + node, true);
producers.add(consumer.getSimpleField("clusterName"));
}
getAppNames();
}
private void getAppNames()
{
Set<String> clusters = new HashSet<String>(consumers);
clusters.addAll(producers);
for (String cluster : clusters)
{
clusterAppMap.put(cluster, getApp(cluster, zkClient));
}
}
public boolean containsCluster(String cluster)
{
if (producers.contains(cluster) || consumers.contains(cluster))
{
return true;
}
return false;
}
private static String getApp(String clusterName, ZkClient zkClient)
{
String appPath = "/s4/clusters/" + clusterName + "/app/s4App";
if (zkClient.exists(appPath))
{
ZNRecord appRecord = zkClient.readData("/s4/clusters/" + clusterName
+ "/app/s4App");
return appRecord.getSimpleField("name");
}
return NONE;
}
}
static class App
{
private String name = NONE;
private String cluster;
private String uri = NONE;
}
static class Cluster
{
private final ZkClient zkClient;
private final String taskPath;
private final String processPath;
private final String appPath;
String clusterName;
int taskNumber;
App app;
List<ClusterNode> nodes = new ArrayList<ClusterNode>();
public Cluster(String clusterName, ZkClient zkClient) throws Exception
{
this.clusterName = clusterName;
this.zkClient = zkClient;
this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
this.processPath = "/s4/clusters/" + clusterName + "/process";
this.appPath = "/s4/clusters/" + clusterName + "/app/s4App";
readClusterFromZk();
}
public void readClusterFromZk() throws Exception
{
List<String> processes;
List<String> tasks;
tasks = zkClient.getChildren(taskPath);
processes = zkClient.getChildren(processPath);
taskNumber = tasks.size();
for (int i = 0; i < processes.size(); i++)
{
ZNRecord process = zkClient.readData(
processPath + "/" + processes.get(i), true);
if (process != null)
{
int partition = Integer.parseInt(process.getSimpleField("partition"));
String host = process.getSimpleField("host");
int port = Integer.parseInt(process.getSimpleField("port"));
String taskId = process.getSimpleField("taskId");
ClusterNode node = new ClusterNode(partition, port, host, taskId);
nodes.add(node);
}
}
app = new App();
app.cluster = clusterName;
try
{
ZNRecord appRecord = zkClient.readData(appPath);
app.name = appRecord.getSimpleField("name");
app.uri = appRecord.getSimpleField("s4r_uri");
} catch (ZkNoNodeException e)
{
logger.warn(appPath + " doesn't exist");
}
}
}
}