blob: aacdd8f2413207d898e3e217c6cce8de78931b6b [file] [log] [blame]
package backtype.storm.utils;
import backtype.storm.Config;
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.ComponentObject;
import backtype.storm.generated.StormTopology;
import clojure.lang.IFn;
import clojure.lang.RT;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.retry.RetryNTimes;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
import org.apache.thrift7.TException;
import org.json.simple.JSONValue;
import org.yaml.snakeyaml.Yaml;
public class Utils {
public static final String DEFAULT_STREAM_ID = "default";
public static Object newInstance(String klass) {
try {
Class c = Class.forName(klass);
return c.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static byte[] serialize(Object obj) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
oos.close();
return bos.toByteArray();
} catch(IOException ioe) {
throw new RuntimeException(ioe);
}
}
public static Object deserialize(byte[] serialized) {
try {
ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
ObjectInputStream ois = new ObjectInputStream(bis);
Object ret = ois.readObject();
ois.close();
return ret;
} catch(IOException ioe) {
throw new RuntimeException(ioe);
} catch(ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
public static <T> String join(Iterable<T> coll, String sep) {
Iterator<T> it = coll.iterator();
String ret = "";
while(it.hasNext()) {
ret = ret + it.next();
if(it.hasNext()) {
ret = ret + sep;
}
}
return ret;
}
public static void sleep(long millis) {
try {
Time.sleep(millis);
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
}
public static List<URL> findResources(String name) {
try {
Enumeration<URL> resources = Thread.currentThread().getContextClassLoader().getResources(name);
List<URL> ret = new ArrayList<URL>();
while(resources.hasMoreElements()) {
ret.add(resources.nextElement());
}
return ret;
} catch(IOException e) {
throw new RuntimeException(e);
}
}
public static Map findAndReadConfigFile(String name, boolean mustExist) {
try {
List<URL> resources = findResources(name);
if(resources.isEmpty()) {
if(mustExist) throw new RuntimeException("Could not find config file on classpath " + name);
else return new HashMap();
}
if(resources.size() > 1) {
throw new RuntimeException("Found multiple " + name + " resources. You're probably bundling the Storm jars with your topology jar.");
}
URL resource = resources.get(0);
Yaml yaml = new Yaml();
Map ret = (Map) yaml.load(new InputStreamReader(resource.openStream()));
if(ret==null) ret = new HashMap();
return new HashMap(ret);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static Map findAndReadConfigFile(String name) {
return findAndReadConfigFile(name, true);
}
public static Map readDefaultConfig() {
return findAndReadConfigFile("defaults.yaml", true);
}
public static Map readCommandLineOpts() {
Map ret = new HashMap();
String commandOptions = System.getProperty("storm.options");
if(commandOptions != null) {
commandOptions = commandOptions.replaceAll("%%%%", " ");
String[] configs = commandOptions.split(",");
for (String config : configs) {
String[] options = config.split("=");
if (options.length == 2) {
ret.put(options[0], options[1]);
}
}
}
return ret;
}
public static Map readStormConfig() {
Map ret = readDefaultConfig();
String confFile = System.getProperty("storm.conf.file");
Map storm;
if (confFile==null || confFile.equals("")) {
storm = findAndReadConfigFile("storm.yaml", false);
} else {
storm = findAndReadConfigFile(confFile, true);
}
ret.putAll(storm);
ret.putAll(readCommandLineOpts());
return ret;
}
private static Object normalizeConf(Object conf) {
if(conf==null) return new HashMap();
if(conf instanceof Map) {
Map confMap = new HashMap((Map) conf);
for(Object key: confMap.keySet()) {
Object val = confMap.get(key);
confMap.put(key, normalizeConf(val));
}
return confMap;
} else if(conf instanceof List) {
List confList = new ArrayList((List) conf);
for(int i=0; i<confList.size(); i++) {
Object val = confList.get(i);
confList.set(i, normalizeConf(val));
}
return confList;
} else if (conf instanceof Integer) {
return ((Integer) conf).longValue();
} else if(conf instanceof Float) {
return ((Float) conf).doubleValue();
} else {
return conf;
}
}
public static boolean isValidConf(Map<String, Object> stormConf) {
return normalizeConf(stormConf).equals(normalizeConf((Map) JSONValue.parse(JSONValue.toJSONString(stormConf))));
}
public static Object getSetComponentObject(ComponentObject obj) {
if(obj.getSetField()==ComponentObject._Fields.SERIALIZED_JAVA) {
return Utils.deserialize(obj.get_serialized_java());
} else if(obj.getSetField()==ComponentObject._Fields.JAVA_OBJECT) {
return obj.get_java_object();
} else {
return obj.get_shell();
}
}
public static <S, T> T get(Map<S, T> m, S key, T def) {
T ret = m.get(key);
if(ret==null) {
ret = def;
}
return ret;
}
public static List<Object> tuple(Object... values) {
List<Object> ret = new ArrayList<Object>();
for(Object v: values) {
ret.add(v);
}
return ret;
}
public static void downloadFromMaster(Map conf, String file, String localFile) throws IOException, TException {
NimbusClient client = NimbusClient.getConfiguredClient(conf);
String id = client.getClient().beginFileDownload(file);
WritableByteChannel out = Channels.newChannel(new FileOutputStream(localFile));
while(true) {
ByteBuffer chunk = client.getClient().downloadChunk(id);
int written = out.write(chunk);
if(written==0) break;
}
out.close();
}
public static IFn loadClojureFn(String namespace, String name) {
try {
clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")"));
} catch (Exception e) {
//if playing from the repl and defining functions, file won't exist
}
return (IFn) RT.var(namespace, name).deref();
}
public static boolean isSystemId(String id) {
return id.startsWith("__");
}
public static <K, V> Map<V, K> reverseMap(Map<K, V> map) {
Map<V, K> ret = new HashMap<V, K>();
for(K key: map.keySet()) {
ret.put(map.get(key), key);
}
return ret;
}
public static ComponentCommon getComponentCommon(StormTopology topology, String id) {
if(topology.get_spouts().containsKey(id)) {
return topology.get_spouts().get(id).get_common();
}
if(topology.get_bolts().containsKey(id)) {
return topology.get_bolts().get(id).get_common();
}
if(topology.get_state_spouts().containsKey(id)) {
return topology.get_state_spouts().get(id).get_common();
}
throw new IllegalArgumentException("Could not find component with id " + id);
}
public static Integer getInt(Object o) {
if(o instanceof Long) {
return ((Long) o ).intValue();
} else if (o instanceof Integer) {
return (Integer) o;
} else if (o instanceof Short) {
return ((Short) o).intValue();
} else {
throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
}
}
public static long secureRandomLong() {
return UUID.randomUUID().getLeastSignificantBits();
}
public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root) {
return newCurator(conf, servers, port, root, null);
}
public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
List<String> serverPorts = new ArrayList<String>();
for(String zkServer: (List<String>) servers) {
serverPorts.add(zkServer + ":" + Utils.getInt(port));
}
String zkStr = StringUtils.join(serverPorts, ",") + root;
try {
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(zkStr)
.connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
.sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
.retryPolicy(new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
if(auth!=null && auth.scheme!=null) {
builder = builder.authorization(auth.scheme, auth.payload);
}
return builder.build();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static CuratorFramework newCurator(Map conf, List<String> servers, Object port) {
return newCurator(conf, servers, port, "");
}
public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, String root) {
CuratorFramework ret = newCurator(conf, servers, port, root);
ret.start();
return ret;
}
public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port) {
CuratorFramework ret = newCurator(conf, servers, port);
ret.start();
return ret;
}
/**
*
(defn integer-divided [sum num-pieces]
(let [base (int (/ sum num-pieces))
num-inc (mod sum num-pieces)
num-bases (- num-pieces num-inc)]
(if (= num-inc 0)
{base num-bases}
{base num-bases (inc base) num-inc}
)))
* @param sum
* @param numPieces
* @return
*/
public static TreeMap<Integer, Integer> integerDivided(int sum, int numPieces) {
int base = sum / numPieces;
int numInc = sum % numPieces;
int numBases = numPieces - numInc;
TreeMap<Integer, Integer> ret = new TreeMap<Integer, Integer>();
ret.put(base, numBases);
if(numInc!=0) {
ret.put(base+1, numInc);
}
return ret;
}
public static byte[] toByteArray(ByteBuffer buffer) {
byte[] ret = new byte[buffer.remaining()];
buffer.get(ret, 0, ret.length);
return ret;
}
}