blob: f78f52ae7f6c8740e965b8f4bdec048e57652e3e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.cluster;
import backtype.storm.Config;
import backtype.storm.generated.StormTopology;
import backtype.storm.utils.LocalState;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class StormConfig {
private final static Logger LOG = LoggerFactory.getLogger(StormConfig.class);
public final static String RESOURCES_SUBDIR = "resources";
public final static String WORKER_DATA_SUBDIR = "worker_shared_data";
public static final String FILE_SEPERATEOR = File.separator;
public static String clojureConfigName(String name) {
return name.toUpperCase().replace("_", "-");
}
public static Map read_storm_config() {
return Utils.readStormConfig();
}
public static Map read_yaml_config(String name) {
return Utils.findAndReadConfigFile(name, true);
}
public static Map read_default_config() {
return Utils.readDefaultConfig();
}
public static List<Object> All_CONFIGS() {
List<Object> rtn = new ArrayList<Object>();
Config config = new Config();
Class<?> ConfigClass = config.getClass();
Field[] fields = ConfigClass.getFields();
for (int i = 0; i < fields.length; i++) {
try {
Object obj = fields[i].get(null);
rtn.add(obj);
} catch (IllegalArgumentException e) {
LOG.error(e.getMessage(), e);
} catch (IllegalAccessException e) {
LOG.error(e.getMessage(), e);
}
}
return rtn;
}
public static HashMap<String, Object> getClassFields(Class<?> cls) throws IllegalArgumentException, IllegalAccessException {
Field[] list = cls.getDeclaredFields();
HashMap<String, Object> rtn = new HashMap<String, Object>();
for (Field f : list) {
String name = f.getName();
rtn.put(name, f.get(null).toString());
}
return rtn;
}
public static String cluster_mode(Map conf) {
String mode = (String) conf.get(Config.STORM_CLUSTER_MODE);
return mode;
}
/**
* please use ConfigExtension.getClusterName(Map conf)
*/
@Deprecated
public static String cluster_name(Map conf) {
return ConfigExtension.getClusterName(conf);
}
public static boolean local_mode(Map conf) {
String mode = (String) conf.get(Config.STORM_CLUSTER_MODE);
if (mode != null) {
if ("local".equals(mode)) {
return true;
}
if ("distributed".equals(mode)) {
return false;
}
}
throw new IllegalArgumentException("Illegal cluster mode in conf:" + mode);
}
/**
* validate whether the mode is distributed
*
* @param conf
*/
public static void validate_distributed_mode(Map<?, ?> conf) {
if (StormConfig.local_mode(conf)) {
throw new IllegalArgumentException("Cannot start server in local mode!");
}
}
public static void validate_local_mode(Map<?, ?> conf) {
if (!StormConfig.local_mode(conf)) {
throw new IllegalArgumentException("Cannot start server in distributed mode!");
}
}
public static String worker_root(Map conf) throws IOException {
String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPERATEOR + "workers";
FileUtils.forceMkdir(new File(ret));
return ret;
}
public static String worker_root(Map conf, String id) throws IOException {
String ret = worker_root(conf) + FILE_SEPERATEOR + id;
FileUtils.forceMkdir(new File(ret));
return ret;
}
public static String worker_pids_root(Map conf, String id) throws IOException {
String ret = worker_root(conf, id) + FILE_SEPERATEOR + "pids";
FileUtils.forceMkdir(new File(ret));
return ret;
}
public static String worker_pid_path(Map conf, String id, String pid) throws IOException {
String ret = worker_pids_root(conf, id) + FILE_SEPERATEOR + pid;
return ret;
}
public static String worker_heartbeats_root(Map conf, String id) throws IOException {
String ret = worker_root(conf, id) + FILE_SEPERATEOR + "heartbeats";
FileUtils.forceMkdir(new File(ret));
return ret;
}
public static String default_worker_shared_dir(Map conf) throws IOException {
String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPERATEOR + WORKER_DATA_SUBDIR;
FileUtils.forceMkdir(new File(ret));
return ret;
}
private static String drpc_local_dir(Map conf) throws IOException {
String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPERATEOR + "drpc";
FileUtils.forceMkdir(new File(ret));
return ret;
}
private static String supervisor_local_dir(Map conf) throws IOException {
String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPERATEOR + "supervisor";
FileUtils.forceMkdir(new File(ret));
return ret;
}
public static String supervisor_stormdist_root(Map conf) throws IOException {
String ret = stormdist_path(supervisor_local_dir(conf));
FileUtils.forceMkdir(new File(ret));
return ret;
}
public static String supervisor_stormdist_root(Map conf, String topologyId) throws IOException {
return supervisor_stormdist_root(conf) + FILE_SEPERATEOR + topologyId;
}
/**
* Return supervisor's pid dir
*
* @param conf
* @return
* @throws IOException
*/
public static String supervisorPids(Map conf) throws IOException {
String ret = supervisor_local_dir(conf) + FILE_SEPERATEOR + "pids";
try {
FileUtils.forceMkdir(new File(ret));
} catch (IOException e) {
LOG.error("Failed to create dir " + ret, e);
throw e;
}
return ret;
}
/**
* Return drpc's pid dir
*
* @param conf
* @return
* @throws IOException
*/
public static String drpcPids(Map conf) throws IOException {
String ret = drpc_local_dir(conf) + FILE_SEPERATEOR + "pids";
try {
FileUtils.forceMkdir(new File(ret));
} catch (IOException e) {
LOG.error("Failed to create dir " + ret, e);
throw e;
}
return ret;
}
/**
* Return nimbus's heartbeat dir for apsara
*
* @param conf
* @return
* @throws IOException
*/
public static String supervisorHearbeatForContainer(Map conf) throws IOException {
String ret = supervisor_local_dir(conf) + FILE_SEPERATEOR + "supervisor.heartbeat";
try {
FileUtils.forceMkdir(new File(ret));
} catch (IOException e) {
LOG.error("Failed to create dir " + ret, e);
throw e;
}
return ret;
}
public static String stormjar_path(String stormroot) {
return stormroot + FILE_SEPERATEOR + "stormjar.jar";
}
public static String stormcode_path(String stormroot) {
return stormroot + FILE_SEPERATEOR + "stormcode.ser";
}
public static String stormconf_path(String stormroot) {
return stormroot + FILE_SEPERATEOR + "stormconf.ser";
}
public static String stormlib_path(String stormroot, String libname) {
return stormroot + FILE_SEPERATEOR + "lib" + FILE_SEPERATEOR + libname;
}
public static String stormlib_path(String stormroot) {
return stormroot + FILE_SEPERATEOR + "lib";
}
public static String stormdist_path(String stormroot) {
return stormroot + FILE_SEPERATEOR + "stormdist";
}
public static String supervisor_storm_resources_path(String stormroot) {
return stormroot + FILE_SEPERATEOR + RESOURCES_SUBDIR;
}
public static String stormtmp_path(String stormroot) {
return stormroot + FILE_SEPERATEOR + "tmp";
}
public static String stormts_path(String stormroot) {
return stormroot + FILE_SEPERATEOR + "timestamp";
}
public static LocalState worker_state(Map conf, String id) throws IOException {
String path = worker_heartbeats_root(conf, id);
LocalState rtn = new LocalState(path);
return rtn;
}
public static String masterLocalDir(Map conf) throws IOException {
String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPERATEOR + "nimbus";
try {
FileUtils.forceMkdir(new File(ret));
} catch (IOException e) {
LOG.error("Failed to create dir " + ret, e);
throw e;
}
return ret;
}
public static String metricLocalDir(Map conf) throws IOException {
String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPERATEOR + "metrics";
try {
FileUtils.forceMkdir(new File(ret));
} catch (IOException e) {
LOG.error("Failed to create dir " + ret, e);
throw e;
}
return ret;
}
public static String masterStormdistRoot(Map conf) throws IOException {
String ret = stormdist_path(masterLocalDir(conf));
FileUtils.forceMkdir(new File(ret));
return ret;
}
public static String masterStormdistRoot(Map conf, String topologyId) throws IOException {
return masterStormdistRoot(conf) + FILE_SEPERATEOR + topologyId;
}
public static String masterStormTmpRoot(Map conf) throws IOException {
String ret = stormtmp_path(masterLocalDir(conf));
FileUtils.forceMkdir(new File(ret));
return ret;
}
public static String masterStormTmpRoot(Map conf, String topologyId) throws IOException {
return masterStormTmpRoot(conf) + FILE_SEPERATEOR + topologyId;
}
public static String masterInbox(Map conf) throws IOException {
String ret = masterLocalDir(conf) + FILE_SEPERATEOR + "inbox";
try {
FileUtils.forceMkdir(new File(ret));
} catch (IOException e) {
LOG.error("Failed to create dir " + ret, e);
throw e;
}
return ret;
}
public static String masterInimbus(Map conf) throws IOException {
String ret = masterLocalDir(conf) + FILE_SEPERATEOR + "ininumbus";
try {
FileUtils.forceMkdir(new File(ret));
} catch (IOException e) {
LOG.error("Failed to create dir " + ret, e);
throw e;
}
return ret;
}
/**
* Return nimbus's pid dir
*
* @param conf
* @return
* @throws IOException
*/
public static String masterPids(Map conf) throws IOException {
String ret = masterLocalDir(conf) + FILE_SEPERATEOR + "pids";
try {
FileUtils.forceMkdir(new File(ret));
} catch (IOException e) {
LOG.error("Failed to create dir " + ret, e);
throw e;
}
return ret;
}
/**
* Return nimbus's heartbeat dir for apsara
*
* @param conf
* @return
* @throws IOException
*/
public static String masterHearbeatForContainer(Map conf) throws IOException {
String ret = masterLocalDir(conf) + FILE_SEPERATEOR + "nimbus.heartbeat";
try {
FileUtils.forceMkdir(new File(ret));
} catch (IOException e) {
LOG.error("Failed to create dir " + ret, e);
throw e;
}
return ret;
}
public static String masterDbDir(Map conf) throws IOException {
return masterLocalDir(conf) + FILE_SEPERATEOR + "rocksdb";
}
public static String metricDbDir(Map conf) throws IOException {
return metricLocalDir(conf) + FILE_SEPERATEOR + "rocksdb";
}
public static String supervisorTmpDir(Map conf) throws IOException {
String ret = null;
try {
ret = supervisor_local_dir(conf) + FILE_SEPERATEOR + "tmp";
FileUtils.forceMkdir(new File(ret));
} catch (IOException e) {
LOG.error("Failed to create dir " + ret, e);
throw e;
}
return ret;
}
public static LocalState supervisorState(Map conf) throws IOException {
LocalState localState = null;
try {
String localstateDir = supervisor_local_dir(conf) + FILE_SEPERATEOR + "localstate";
FileUtils.forceMkdir(new File(localstateDir));
localState = new LocalState(localstateDir);
} catch (IOException e) {
LOG.error("Failed to create supervisor LocalState", e);
throw e;
}
return localState;
}
/**
* stormconf is mergered into clusterconf
*
* @param conf
* @param topologyId
* @return
* @throws IOException
*/
public static Map read_supervisor_topology_conf(Map conf, String topologyId) throws IOException {
String topologyRoot = StormConfig.supervisor_stormdist_root(conf, topologyId);
String confPath = StormConfig.stormconf_path(topologyRoot);
return (Map) readLocalObject(topologyId, confPath);
}
public static StormTopology read_supervisor_topology_code(Map conf, String topologyId) throws IOException {
String topologyRoot = StormConfig.supervisor_stormdist_root(conf, topologyId);
String codePath = StormConfig.stormcode_path(topologyRoot);
return (StormTopology) readLocalObject(topologyId, codePath);
}
@SuppressWarnings("rawtypes")
public static List<String> get_supervisor_toplogy_list(Map conf) throws IOException {
// get the path: STORM-LOCAL-DIR/supervisor/stormdist/
String path = StormConfig.supervisor_stormdist_root(conf);
List<String> topologyids = PathUtils.read_dir_contents(path);
return topologyids;
}
public static Map read_nimbus_topology_conf(Map conf, String topologyId) throws IOException {
String topologyRoot = StormConfig.masterStormdistRoot(conf, topologyId);
return read_topology_conf(topologyRoot, topologyId);
}
public static void write_nimbus_topology_conf(Map conf, String topologyId, Map topoConf) throws IOException {
String topologyRoot = StormConfig.masterStormdistRoot(conf, topologyId);
String confPath = StormConfig.stormconf_path(topologyRoot);
FileUtils.writeByteArrayToFile(new File(confPath), Utils.serialize(topoConf));
}
public static Map read_nimbusTmp_topology_conf(Map conf, String topologyId) throws IOException {
String topologyRoot = StormConfig.masterStormTmpRoot(conf, topologyId);
return read_topology_conf(topologyRoot, topologyId);
}
public static Map read_topology_conf(String topologyRoot, String topologyId) throws IOException {
String readFile = StormConfig.stormconf_path(topologyRoot);
return (Map) readLocalObject(topologyId, readFile);
}
public static StormTopology read_nimbus_topology_code(Map conf, String topologyId) throws IOException {
String topologyRoot = StormConfig.masterStormdistRoot(conf, topologyId);
String codePath = StormConfig.stormcode_path(topologyRoot);
return (StormTopology) readLocalObject(topologyId, codePath);
}
public static void write_nimbus_topology_code(Map conf, String topologyId, byte[] data) throws IOException {
String topologyRoot = StormConfig.masterStormdistRoot(conf, topologyId);
String codePath = StormConfig.stormcode_path(topologyRoot);
FileUtils.writeByteArrayToFile(new File(codePath), data);
}
public static long read_supervisor_topology_timestamp(Map conf, String topologyId) throws IOException {
String stormRoot = supervisor_stormdist_root(conf, topologyId);
String timeStampPath = stormts_path(stormRoot);
byte[] data = FileUtils.readFileToByteArray(new File(timeStampPath));
return JStormUtils.bytesToLong(data);
}
public static void write_supervisor_topology_timestamp(Map conf, String topologyId, long timeStamp) throws IOException {
String stormRoot = supervisor_stormdist_root(conf, topologyId);
String timeStampPath = stormts_path(stormRoot);
byte[] data = JStormUtils.longToBytes(timeStamp);
FileUtils.writeByteArrayToFile(new File(timeStampPath), data);
}
public static long read_nimbus_topology_timestamp(Map conf, String topologyId) throws IOException {
String stormRoot = masterStormdistRoot(conf, topologyId);
String timeStampPath = stormts_path(stormRoot);
byte[] data = FileUtils.readFileToByteArray(new File(timeStampPath));
return JStormUtils.bytesToLong(data);
}
public static void write_nimbus_topology_timestamp(Map conf, String topologyId, long timeStamp) throws IOException {
String stormRoot = masterStormdistRoot(conf, topologyId);
String timeStampPath = stormts_path(stormRoot);
byte[] data = JStormUtils.longToBytes(timeStamp);
FileUtils.writeByteArrayToFile(new File(timeStampPath), data);
}
/**
* stormconf has mergered into clusterconf
*
* @param topologyId
* @param readFile
* @return
* @throws IOException
*/
@SuppressWarnings("unchecked")
public static Object readLocalObject(String topologyId, String readFile) throws IOException {
String errMsg = "Failed to get topology configuration of " + topologyId + " file:" + readFile;
byte[] bconf = FileUtils.readFileToByteArray(new File(readFile));
if (bconf == null) {
errMsg += ", due to failed to read";
LOG.error(errMsg);
throw new IOException(errMsg);
}
Object ret = null;
try {
ret = Utils.javaDeserialize(bconf);
} catch (Exception e) {
errMsg += ", due to failed to serialized the data";
LOG.error(errMsg);
throw new IOException(errMsg);
}
return ret;
}
public static long get_supervisor_topology_Bianrymodify_time(Map conf, String topologyId) throws IOException {
String topologyRoot = StormConfig.supervisor_stormdist_root(conf, topologyId);
File f = new File(topologyRoot);
long modifyTime = f.lastModified();
return modifyTime;
}
}