blob: 0bb1bb7e8f00297b1065d697ba956204719892d5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.cluster;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.task.TaskInfo;
import com.alibaba.jstorm.task.error.TaskError;
import com.alibaba.jstorm.utils.TimeUtils;
/**
* storm operation ZK
*
* @author yannian/longda/zhiyuan.ls
*
*/
public class Cluster {
// TODO Need Migrate constants to ZkConstant
private static Logger LOG = LoggerFactory.getLogger(Cluster.class);
public static final String ZK_SEPERATOR = "/";
public static final String ASSIGNMENTS_ROOT = "assignments";
public static final String ASSIGNMENTS_BAK = "assignments_bak";
public static final String TASKS_ROOT = "tasks";
public static final String STORMS_ROOT = "topology";
public static final String SUPERVISORS_ROOT = "supervisors";
public static final String TASKBEATS_ROOT = "taskbeats";
public static final String TASKERRORS_ROOT = "taskerrors";
public static final String MASTER_ROOT = "nimbus_master";
public static final String NIMBUS_SLAVE_ROOT = "nimbus_slave";
public static final String METRIC_ROOT = "metrics";
public static final String LAST_ERROR = "last_error";
public static final String NIMBUS_SLAVE_DETAIL_ROOT= "nimbus_slave_detail";
public static final String BACKPRESSURE_ROOT = "backpressure";
public static final String ASSIGNMENTS_SUBTREE;
public static final String ASSIGNMENTS_BAK_SUBTREE;
public static final String TASKS_SUBTREE;
public static final String STORMS_SUBTREE;
public static final String SUPERVISORS_SUBTREE;
public static final String TASKBEATS_SUBTREE;
public static final String TASKERRORS_SUBTREE;
public static final String MASTER_SUBTREE;
public static final String NIMBUS_SLAVE_SUBTREE;
public static final String METRIC_SUBTREE;
public static final String NIMBUS_SLAVE_DETAIL_SUBTREE;
public static final String BACKPRESSURE_SUBTREE;
static {
ASSIGNMENTS_SUBTREE = ZK_SEPERATOR + ASSIGNMENTS_ROOT;
ASSIGNMENTS_BAK_SUBTREE = ZK_SEPERATOR + ASSIGNMENTS_BAK;
TASKS_SUBTREE = ZK_SEPERATOR + TASKS_ROOT;
STORMS_SUBTREE = ZK_SEPERATOR + STORMS_ROOT;
SUPERVISORS_SUBTREE = ZK_SEPERATOR + SUPERVISORS_ROOT;
TASKBEATS_SUBTREE = ZK_SEPERATOR + TASKBEATS_ROOT;
TASKERRORS_SUBTREE = ZK_SEPERATOR + TASKERRORS_ROOT;
MASTER_SUBTREE = ZK_SEPERATOR + MASTER_ROOT;
NIMBUS_SLAVE_SUBTREE = ZK_SEPERATOR + NIMBUS_SLAVE_ROOT;
METRIC_SUBTREE = ZK_SEPERATOR + METRIC_ROOT;
NIMBUS_SLAVE_DETAIL_SUBTREE = ZK_SEPERATOR + NIMBUS_SLAVE_DETAIL_ROOT;
BACKPRESSURE_SUBTREE = ZK_SEPERATOR + BACKPRESSURE_ROOT;
}
public static String supervisor_path(String id) {
return SUPERVISORS_SUBTREE + ZK_SEPERATOR + id;
}
public static String assignment_path(String id) {
return ASSIGNMENTS_SUBTREE + ZK_SEPERATOR + id;
}
public static String storm_path(String id) {
return STORMS_SUBTREE + ZK_SEPERATOR + id;
}
public static String storm_task_root(String topology_id) {
return TASKS_SUBTREE + ZK_SEPERATOR + topology_id;
}
public static String taskbeat_storm_root(String topology_id) {
return TASKBEATS_SUBTREE + ZK_SEPERATOR + topology_id;
}
public static String taskerror_storm_root(String topology_id) {
return TASKERRORS_SUBTREE + ZK_SEPERATOR + topology_id;
}
public static String lasterror_path(String topology_id) {
return taskerror_storm_root(topology_id) + ZK_SEPERATOR + LAST_ERROR;
}
public static String taskerror_path(String topology_id, int task_id) {
return taskerror_storm_root(topology_id) + ZK_SEPERATOR + task_id;
}
public static String metric_path(String topology_id) {
return METRIC_SUBTREE + ZK_SEPERATOR + topology_id;
}
public static String assignment_bak_path(String id) {
return ASSIGNMENTS_BAK_SUBTREE + ZK_SEPERATOR + id;
}
public static String backpressure_path(String topology_id) {
return BACKPRESSURE_SUBTREE + ZK_SEPERATOR + topology_id;
}
@SuppressWarnings("rawtypes")
public static StormClusterState mk_storm_cluster_state(Map cluster_state_spec) throws Exception {
return new StormZkClusterState(cluster_state_spec);
}
public static StormClusterState mk_storm_cluster_state(ClusterState cluster_state_spec) throws Exception {
return new StormZkClusterState(cluster_state_spec);
}
public static Map<Integer, TaskInfo> get_all_taskInfo(StormClusterState zkCluster, String topologyId) throws Exception {
return zkCluster.task_all_info(topologyId);
}
public static Map<Integer, String> get_all_task_component(StormClusterState zkCluster, String topologyId, Map<Integer, TaskInfo> taskInfoMap)
throws Exception {
if (taskInfoMap == null) {
taskInfoMap = get_all_taskInfo(zkCluster, topologyId);
}
if (taskInfoMap == null) {
return null;
}
return Common.getTaskToComponent(taskInfoMap);
}
public static Map<Integer, String> get_all_task_type(StormClusterState zkCluster, String topologyId, Map<Integer, TaskInfo> taskInfoMap) throws Exception {
if (taskInfoMap == null) {
taskInfoMap = get_all_taskInfo(zkCluster, topologyId);
}
if (taskInfoMap == null) {
return null;
}
return Common.getTaskToType(taskInfoMap);
}
/**
* if one topology's name equal the input storm_name, then return the topology id, otherwise return null
*
* @param zkCluster
* @param storm_name
* @return
* @throws Exception
*/
public static String get_topology_id(StormClusterState zkCluster, String storm_name) throws Exception {
List<String> active_storms = zkCluster.active_storms();
String rtn = null;
if (active_storms != null) {
for (String topology_id : active_storms) {
if (topology_id.indexOf(storm_name) < 0) {
continue;
}
StormBase base = zkCluster.storm_base(topology_id, null);
if (base != null && storm_name.equals(Common.getTopologyNameById(topology_id))) {
rtn = topology_id;
break;
}
}
}
return rtn;
}
/**
* get all topology's StormBase
*
* @param zkCluster
* @return <topology_id, StormBase>
* @throws Exception
*/
public static HashMap<String, StormBase> get_all_StormBase(StormClusterState zkCluster) throws Exception {
HashMap<String, StormBase> rtn = new HashMap<String, StormBase>();
List<String> active_storms = zkCluster.active_storms();
if (active_storms != null) {
for (String topology_id : active_storms) {
StormBase base = zkCluster.storm_base(topology_id, null);
if (base != null) {
rtn.put(topology_id, base);
}
}
}
return rtn;
}
/**
* get all SupervisorInfo of storm cluster
*
* @param stormClusterState
* @param callback
* @return Map<String, SupervisorInfo> String: supervisorId SupervisorInfo: [time-secs hostname worker-ports uptime-secs]
* @throws Exception
*/
public static Map<String, SupervisorInfo> get_all_SupervisorInfo(StormClusterState stormClusterState, RunnableCallback callback) throws Exception {
Map<String, SupervisorInfo> rtn = new TreeMap<String, SupervisorInfo>();
// get /ZK/supervisors
List<String> supervisorIds = stormClusterState.supervisors(callback);
if (supervisorIds != null) {
for (Iterator<String> iter = supervisorIds.iterator(); iter.hasNext();) {
String supervisorId = iter.next();
// get /supervisors/supervisorid
SupervisorInfo supervisorInfo = stormClusterState.supervisor_info(supervisorId);
if (supervisorInfo == null) {
LOG.warn("Failed to get SupervisorInfo of " + supervisorId);
} else {
rtn.put(supervisorId, supervisorInfo);
}
}
} else {
LOG.info("No alive supervisor");
}
return rtn;
}
public static Map<String, Assignment> get_all_assignment(StormClusterState stormClusterState, RunnableCallback callback) throws Exception {
Map<String, Assignment> ret = new HashMap<String, Assignment>();
// get /assignments {topology_id}
List<String> assignments = stormClusterState.assignments(callback);
if (assignments == null) {
LOG.debug("No assignment of ZK");
return ret;
}
for (String topology_id : assignments) {
Assignment assignment = stormClusterState.assignment_info(topology_id, callback);
if (assignment == null) {
LOG.error("Failed to get Assignment of " + topology_id + " from ZK");
continue;
}
ret.put(topology_id, assignment);
}
return ret;
}
public static Map<String, String> get_all_nimbus_slave(StormClusterState stormClusterState) throws Exception {
List<String> hosts = stormClusterState.get_nimbus_slaves();
if (hosts == null || hosts.size() == 0) {
return null;
}
Map<String, String> ret = new HashMap<String, String>();
for (String host : hosts) {
String time = stormClusterState.get_nimbus_slave_time(host);
ret.put(host, time);
}
return ret;
}
public static String get_supervisor_hostname(StormClusterState stormClusterState, String supervisorId) throws Exception {
SupervisorInfo supervisorInfo = stormClusterState.supervisor_info(supervisorId);
if (supervisorInfo == null) {
return null;
} else {
return supervisorInfo.getHostName();
}
}
public static boolean is_topology_exist_error(StormClusterState stormClusterState, String topologyId) throws Exception {
Map<Integer, String> lastErrMap = stormClusterState.topo_lastErr_time(topologyId);
if (lastErrMap == null || lastErrMap.size() == 0) {
return false;
}
int now = TimeUtils.current_time_secs();
for (Entry<Integer, String> entry : lastErrMap.entrySet()) {
Integer timeout = entry.getKey();
Integer timestamp = Integer.valueOf(entry.getValue());
if (now - timestamp <= timeout) {
return true;
}
}
return false;
}
public static Map<Integer, List<TaskError>> get_all_task_errors(StormClusterState stormClusterState, String topologyId) {
Map<Integer, List<TaskError>> ret = new HashMap<Integer, List<TaskError>>();
try {
List<String> errorTasks = stormClusterState.task_error_ids(topologyId);
if (errorTasks == null || errorTasks.size() == 0) {
return ret;
}
for (String taskIdStr : errorTasks) {
Integer taskId = -1;
try {
taskId = Integer.valueOf(taskIdStr);
} catch (Exception e) {
// skip last_error
continue;
}
List<TaskError> taskErrorList = stormClusterState.task_errors(topologyId, taskId);
ret.put(taskId, taskErrorList);
}
return ret;
} catch (Exception e) {
// TODO Auto-generated catch block
return ret;
}
}
}