| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package backtype.storm.utils; |
| |
| import backtype.storm.Config; |
| import backtype.storm.generated.AuthorizationException; |
| import backtype.storm.generated.ComponentCommon; |
| import backtype.storm.generated.ComponentObject; |
| import backtype.storm.generated.StormTopology; |
| import backtype.storm.serialization.DefaultSerializationDelegate; |
| import backtype.storm.serialization.SerializationDelegate; |
| import clojure.lang.IFn; |
| import clojure.lang.RT; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.curator.framework.CuratorFramework; |
| import org.apache.curator.framework.CuratorFrameworkFactory; |
| import org.apache.thrift.TException; |
| import org.apache.zookeeper.ZooDefs; |
| import org.apache.zookeeper.data.ACL; |
| import org.apache.zookeeper.data.Id; |
| import org.json.simple.JSONValue; |
| import org.json.simple.parser.ParseException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.yaml.snakeyaml.Yaml; |
| import org.yaml.snakeyaml.constructor.SafeConstructor; |
| |
| import java.net.URL; |
| import java.net.URLDecoder; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.Channels; |
| import java.nio.channels.WritableByteChannel; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.ObjectOutputStream; |
| import java.io.ObjectInputStream; |
| import java.io.ByteArrayInputStream; |
| import java.io.OutputStreamWriter; |
| import java.io.InputStreamReader; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.io.FileOutputStream; |
| import java.io.BufferedReader; |
| import java.io.RandomAccessFile; |
| import java.io.Serializable; |
| import java.io.IOException; |
| import java.util.Map; |
| import java.util.Iterator; |
| import java.util.Enumeration; |
| import java.util.List; |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.HashMap; |
| import java.util.TreeMap; |
| import java.util.UUID; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| import java.util.zip.GZIPInputStream; |
| import java.util.zip.GZIPOutputStream; |
| import java.util.zip.ZipEntry; |
| import java.util.zip.ZipFile; |
| |
| import org.apache.thrift.TBase; |
| import org.apache.thrift.TDeserializer; |
| import org.apache.thrift.TSerializer; |
| |
| public class Utils { |
| private static final Logger LOG = LoggerFactory.getLogger(Utils.class); |
| public static final String DEFAULT_STREAM_ID = "default"; |
| private static ThreadLocal<TSerializer> threadSer = new ThreadLocal<TSerializer>(); |
| private static ThreadLocal<TDeserializer> threadDes = new ThreadLocal<TDeserializer>(); |
| |
| private static SerializationDelegate serializationDelegate; |
| |
| static { |
| Map conf = readStormConfig(); |
| serializationDelegate = getSerializationDelegate(conf); |
| } |
| |
| public static Object newInstance(String klass) { |
| try { |
| Class c = Class.forName(klass); |
| return c.newInstance(); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public static byte[] serialize(Object obj) { |
| return serializationDelegate.serialize(obj); |
| } |
| |
| public static <T> T deserialize(byte[] serialized, Class<T> clazz) { |
| return serializationDelegate.deserialize(serialized, clazz); |
| } |
| |
| public static byte[] thriftSerialize(TBase t) { |
| try { |
| TSerializer ser = threadSer.get(); |
| if (ser == null) { |
| ser = new TSerializer(); |
| threadSer.set(ser); |
| } |
| return ser.serialize(t); |
| } catch (TException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private static TDeserializer getDes() { |
| TDeserializer des = threadDes.get(); |
| if(des == null) { |
| des = new TDeserializer(); |
| threadDes.set(des); |
| } |
| return des; |
| } |
| |
| public static <T> T thriftDeserialize(Class c, byte[] b, int offset, int length) { |
| try { |
| T ret = (T) c.newInstance(); |
| TDeserializer des = getDes(); |
| des.deserialize((TBase)ret, b, offset, length); |
| return ret; |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public static <T> T thriftDeserialize(Class c, byte[] b) { |
| try { |
| T ret = (T) c.newInstance(); |
| TDeserializer des = getDes(); |
| des.deserialize((TBase) ret, b); |
| return ret; |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| |
| } |
| |
| public static byte[] javaSerialize(Object obj) { |
| try { |
| ByteArrayOutputStream bos = new ByteArrayOutputStream(); |
| ObjectOutputStream oos = new ObjectOutputStream(bos); |
| oos.writeObject(obj); |
| oos.close(); |
| return bos.toByteArray(); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public static <T> T javaDeserialize(byte[] serialized, Class<T> clazz) { |
| try { |
| ByteArrayInputStream bis = new ByteArrayInputStream(serialized); |
| ObjectInputStream ois = new ObjectInputStream(bis); |
| Object ret = ois.readObject(); |
| ois.close(); |
| return (T)ret; |
| } catch (IOException ioe) { |
| throw new RuntimeException(ioe); |
| } catch (ClassNotFoundException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public static byte[] gzip(byte[] data) { |
| try { |
| ByteArrayOutputStream bos = new ByteArrayOutputStream(); |
| GZIPOutputStream out = new GZIPOutputStream(bos); |
| out.write(data); |
| out.close(); |
| return bos.toByteArray(); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public static byte[] gunzip(byte[] data) { |
| try { |
| ByteArrayOutputStream bos = new ByteArrayOutputStream(); |
| ByteArrayInputStream bis = new ByteArrayInputStream(data); |
| GZIPInputStream in = new GZIPInputStream(bis); |
| byte[] buffer = new byte[1024]; |
| int len = 0; |
| while ((len = in.read(buffer)) >= 0) { |
| bos.write(buffer, 0, len); |
| } |
| in.close(); |
| bos.close(); |
| return bos.toByteArray(); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public static byte[] toCompressedJsonConf(Map<String, Object> stormConf) { |
| try { |
| ByteArrayOutputStream bos = new ByteArrayOutputStream(); |
| OutputStreamWriter out = new OutputStreamWriter(new GZIPOutputStream(bos)); |
| JSONValue.writeJSONString(stormConf, out); |
| out.close(); |
| return bos.toByteArray(); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public static Map<String, Object> fromCompressedJsonConf(byte[] serialized) { |
| try { |
| ByteArrayInputStream bis = new ByteArrayInputStream(serialized); |
| InputStreamReader in = new InputStreamReader(new GZIPInputStream(bis)); |
| Object ret = JSONValue.parseWithException(in); |
| in.close(); |
| return (Map<String,Object>)ret; |
| } catch (IOException ioe) { |
| throw new RuntimeException(ioe); |
| } catch (ParseException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public static <T> String join(Iterable<T> coll, String sep) { |
| Iterator<T> it = coll.iterator(); |
| StringBuilder ret = new StringBuilder(); |
| while(it.hasNext()) { |
| ret.append(it.next()); |
| if(it.hasNext()) { |
| ret.append(sep); |
| } |
| } |
| return ret.toString(); |
| } |
| |
| public static void sleep(long millis) { |
| try { |
| Time.sleep(millis); |
| } catch(InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public static List<URL> findResources(String name) { |
| try { |
| Enumeration<URL> resources = Thread.currentThread().getContextClassLoader().getResources(name); |
| List<URL> ret = new ArrayList<URL>(); |
| while (resources.hasMoreElements()) { |
| ret.add(resources.nextElement()); |
| } |
| return ret; |
| } catch(IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public static Map findAndReadConfigFile(String name, boolean mustExist) { |
| InputStream in = null; |
| boolean confFileEmpty = false; |
| try { |
| in = getConfigFileInputStream(name); |
| if (null != in) { |
| Yaml yaml = new Yaml(new SafeConstructor()); |
| Map ret = (Map) yaml.load(new InputStreamReader(in)); |
| if (null != ret) { |
| return new HashMap(ret); |
| } else { |
| confFileEmpty = true; |
| } |
| } |
| |
| if (mustExist) { |
| if(confFileEmpty) |
| throw new RuntimeException("Config file " + name + " doesn't have any valid storm configs"); |
| else |
| throw new RuntimeException("Could not find config file on classpath " + name); |
| } else { |
| return new HashMap(); |
| } |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } finally { |
| if (null != in) { |
| try { |
| in.close(); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| } |
| |
| private static InputStream getConfigFileInputStream(String configFilePath) |
| throws IOException { |
| if (null == configFilePath) { |
| throw new IOException( |
| "Could not find config file, name not specified"); |
| } |
| |
| HashSet<URL> resources = new HashSet<URL>(findResources(configFilePath)); |
| if (resources.isEmpty()) { |
| File configFile = new File(configFilePath); |
| if (configFile.exists()) { |
| return new FileInputStream(configFile); |
| } |
| } else if (resources.size() > 1) { |
| throw new IOException( |
| "Found multiple " + configFilePath |
| + " resources. You're probably bundling the Storm jars with your topology jar. " |
| + resources); |
| } else { |
| LOG.debug("Using "+configFilePath+" from resources"); |
| URL resource = resources.iterator().next(); |
| return resource.openStream(); |
| } |
| return null; |
| } |
| |
| |
| public static Map findAndReadConfigFile(String name) { |
| return findAndReadConfigFile(name, true); |
| } |
| |
| public static Map readDefaultConfig() { |
| return findAndReadConfigFile("defaults.yaml", true); |
| } |
| |
| public static Map readCommandLineOpts() { |
| Map ret = new HashMap(); |
| String commandOptions = System.getProperty("storm.options"); |
| if (commandOptions != null) { |
| String[] configs = commandOptions.split(","); |
| for (String config : configs) { |
| config = URLDecoder.decode(config); |
| String[] options = config.split("=", 2); |
| if (options.length == 2) { |
| Object val = options[1]; |
| try { |
| val = JSONValue.parseWithException(options[1]); |
| } catch (ParseException ignored) { |
| //fall back to string, which is already set |
| } |
| ret.put(options[0], val); |
| } |
| } |
| } |
| return ret; |
| } |
| |
| public static Map readStormConfig() { |
| Map ret = readDefaultConfig(); |
| String confFile = System.getProperty("storm.conf.file"); |
| Map storm; |
| if (confFile == null || confFile.equals("")) { |
| storm = findAndReadConfigFile("storm.yaml", false); |
| } else { |
| storm = findAndReadConfigFile(confFile, true); |
| } |
| ret.putAll(storm); |
| ret.putAll(readCommandLineOpts()); |
| return ret; |
| } |
| |
| private static Object normalizeConf(Object conf) { |
| if (conf == null) return new HashMap(); |
| if (conf instanceof Map) { |
| Map<Object, Object> confMap = new HashMap((Map) conf); |
| for (Map.Entry<Object, Object> entry : confMap.entrySet()) { |
| confMap.put(entry.getKey(), normalizeConf(entry.getValue())); |
| } |
| return confMap; |
| } else if (conf instanceof List) { |
| List confList = new ArrayList((List) conf); |
| for (int i = 0; i < confList.size(); i++) { |
| Object val = confList.get(i); |
| confList.set(i, normalizeConf(val)); |
| } |
| return confList; |
| } else if (conf instanceof Integer) { |
| return ((Integer) conf).longValue(); |
| } else if (conf instanceof Float) { |
| return ((Float) conf).doubleValue(); |
| } else { |
| return conf; |
| } |
| } |
| |
| public static boolean isValidConf(Map<String, Object> stormConf) { |
| return normalizeConf(stormConf).equals(normalizeConf((Map) JSONValue.parse(JSONValue.toJSONString(stormConf)))); |
| } |
| |
| public static Object getSetComponentObject(ComponentObject obj) { |
| if (obj.getSetField() == ComponentObject._Fields.SERIALIZED_JAVA) { |
| return Utils.javaDeserialize(obj.get_serialized_java(), Serializable.class); |
| } else if (obj.getSetField() == ComponentObject._Fields.JAVA_OBJECT) { |
| return obj.get_java_object(); |
| } else { |
| return obj.get_shell(); |
| } |
| } |
| |
| public static <S, T> T get(Map<S, T> m, S key, T def) { |
| T ret = m.get(key); |
| if (ret == null) { |
| ret = def; |
| } |
| return ret; |
| } |
| |
| public static List<Object> tuple(Object... values) { |
| List<Object> ret = new ArrayList<Object>(); |
| for (Object v : values) { |
| ret.add(v); |
| } |
| return ret; |
| } |
| |
| public static void downloadFromMaster(Map conf, String file, String localFile) throws AuthorizationException, IOException, TException { |
| NimbusClient client = NimbusClient.getConfiguredClient(conf); |
| try { |
| download(client, file, localFile); |
| } finally { |
| client.close(); |
| } |
| } |
| |
| public static void downloadFromHost(Map conf, String file, String localFile, String host, int port) throws IOException, TException, AuthorizationException { |
| NimbusClient client = new NimbusClient (conf, host, port, null); |
| try { |
| download(client, file, localFile); |
| } finally { |
| client.close(); |
| } |
| } |
| |
| private static void download(NimbusClient client, String file, String localFile) throws IOException, TException, AuthorizationException { |
| WritableByteChannel out = Channels.newChannel(new FileOutputStream(localFile)); |
| try { |
| String id = client.getClient().beginFileDownload(file); |
| while (true) { |
| ByteBuffer chunk = client.getClient().downloadChunk(id); |
| int written = out.write(chunk); |
| if (written == 0) break; |
| } |
| } finally { |
| out.close(); |
| } |
| } |
| |
| public static IFn loadClojureFn(String namespace, String name) { |
| try { |
| clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")")); |
| } catch (Exception e) { |
| //if playing from the repl and defining functions, file won't exist |
| } |
| return (IFn) RT.var(namespace, name).deref(); |
| } |
| |
| public static boolean isSystemId(String id) { |
| return id.startsWith("__"); |
| } |
| |
| public static <K, V> Map<V, K> reverseMap(Map<K, V> map) { |
| Map<V, K> ret = new HashMap<V, K>(); |
| for (Map.Entry<K, V> entry : map.entrySet()) { |
| ret.put(entry.getValue(), entry.getKey()); |
| } |
| return ret; |
| } |
| |
| public static ComponentCommon getComponentCommon(StormTopology topology, String id) { |
| if (topology.get_spouts().containsKey(id)) { |
| return topology.get_spouts().get(id).get_common(); |
| } |
| if (topology.get_bolts().containsKey(id)) { |
| return topology.get_bolts().get(id).get_common(); |
| } |
| if (topology.get_state_spouts().containsKey(id)) { |
| return topology.get_state_spouts().get(id).get_common(); |
| } |
| throw new IllegalArgumentException("Could not find component with id " + id); |
| } |
| |
| public static Integer getInt(Object o) { |
| Integer result = getInt(o, null); |
| if (null == result) { |
| throw new IllegalArgumentException("Don't know how to convert null to int"); |
| } |
| return result; |
| } |
| |
| public static Integer getInt(Object o, Integer defaultValue) { |
| if (null == o) { |
| return defaultValue; |
| } |
| |
| if (o instanceof Integer || |
| o instanceof Short || |
| o instanceof Byte) { |
| return ((Number) o).intValue(); |
| } else if (o instanceof Long) { |
| final long l = (Long) o; |
| if (l <= Integer.MAX_VALUE && l >= Integer.MIN_VALUE) { |
| return (int) l; |
| } |
| } else if (o instanceof String) { |
| return Integer.parseInt((String) o); |
| } |
| |
| throw new IllegalArgumentException("Don't know how to convert " + o + " to int"); |
| } |
| |
| public static Double getDouble(Object o) { |
| Double result = getDouble(o, null); |
| if (null == result) { |
| throw new IllegalArgumentException("Don't know how to convert null to double"); |
| } |
| return result; |
| } |
| |
| public static Double getDouble(Object o, Double defaultValue) { |
| if (null == o) { |
| return defaultValue; |
| } |
| if (o instanceof Number) { |
| return ((Number) o).doubleValue(); |
| } else { |
| throw new IllegalArgumentException("Don't know how to convert " + o + " + to double"); |
| } |
| } |
| |
| public static boolean getBoolean(Object o, boolean defaultValue) { |
| if (null == o) { |
| return defaultValue; |
| } |
| if (o instanceof Boolean) { |
| return (Boolean) o; |
| } else { |
| throw new IllegalArgumentException("Don't know how to convert " + o + " + to boolean"); |
| } |
| } |
| |
| public static String getString(Object o, String defaultValue) { |
| if (null == o) { |
| return defaultValue; |
| } |
| if (o instanceof String) { |
| return (String) o; |
| } else { |
| throw new IllegalArgumentException("Don't know how to convert " + o + " + to String"); |
| } |
| } |
| |
| public static long secureRandomLong() { |
| return UUID.randomUUID().getLeastSignificantBits(); |
| } |
| |
| public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root) { |
| return newCurator(conf, servers, port, root, null); |
| } |
| |
| public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) { |
| List<String> serverPorts = new ArrayList<String>(); |
| for (String zkServer : (List<String>) servers) { |
| serverPorts.add(zkServer + ":" + Utils.getInt(port)); |
| } |
| String zkStr = StringUtils.join(serverPorts, ",") + root; |
| CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); |
| |
| setupBuilder(builder, zkStr, conf, auth); |
| |
| return builder.build(); |
| } |
| |
| protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, String zkStr, Map conf, ZookeeperAuthInfo auth) |
| { |
| builder.connectString(zkStr) |
| .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT))) |
| .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT))) |
| .retryPolicy(new StormBoundedExponentialBackoffRetry( |
| Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)), |
| Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING)), |
| Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)))); |
| |
| if (auth != null && auth.scheme != null && auth.payload != null) { |
| builder = builder.authorization(auth.scheme, auth.payload); |
| } |
| } |
| |
| public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) { |
| return newCurator(conf, servers, port, "", auth); |
| } |
| |
| public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) { |
| CuratorFramework ret = newCurator(conf, servers, port, root, auth); |
| ret.start(); |
| return ret; |
| } |
| |
| public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) { |
| CuratorFramework ret = newCurator(conf, servers, port, auth); |
| ret.start(); |
| return ret; |
| } |
| |
| public static TreeMap<Integer, Integer> integerDivided(int sum, int numPieces) { |
| int base = sum / numPieces; |
| int numInc = sum % numPieces; |
| int numBases = numPieces - numInc; |
| TreeMap<Integer, Integer> ret = new TreeMap<Integer, Integer>(); |
| ret.put(base, numBases); |
| if (numInc != 0) { |
| ret.put(base+1, numInc); |
| } |
| return ret; |
| } |
| |
| public static byte[] toByteArray(ByteBuffer buffer) { |
| byte[] ret = new byte[buffer.remaining()]; |
| buffer.get(ret, 0, ret.length); |
| return ret; |
| } |
| |
| public static void readAndLogStream(String prefix, InputStream in) { |
| try { |
| BufferedReader r = new BufferedReader(new InputStreamReader(in)); |
| String line = null; |
| while ((line = r.readLine()) != null) { |
| LOG.info("{}:{}", prefix, line); |
| } |
| } catch (IOException e) { |
| LOG.warn("Error whiel trying to log stream", e); |
| } |
| } |
| |
| public static boolean exceptionCauseIsInstanceOf(Class klass, Throwable throwable) { |
| Throwable t = throwable; |
| while (t != null) { |
| if (klass.isInstance(t)) { |
| return true; |
| } |
| t = t.getCause(); |
| } |
| return false; |
| } |
| |
| /** |
| * Is the cluster configured to interact with ZooKeeper in a secure way? |
| * This only works when called from within Nimbus or a Supervisor process. |
| * @param conf the storm configuration, not the topology configuration |
| * @return true if it is configured else false. |
| */ |
| public static boolean isZkAuthenticationConfiguredStormServer(Map conf) { |
| return null != System.getProperty("java.security.auth.login.config") |
| || (conf != null |
| && conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME) != null |
| && !((String)conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME)).isEmpty()); |
| } |
| |
| /** |
| * Is the topology configured to have ZooKeeper authentication. |
| * @param conf the topology configuration |
| * @return true if ZK is configured else false |
| */ |
| public static boolean isZkAuthenticationConfiguredTopology(Map conf) { |
| return (conf != null |
| && conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME) != null |
| && !((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty()); |
| } |
| |
| public static List<ACL> getWorkerACL(Map conf) { |
| //This is a work around to an issue with ZK where a sasl super user is not super unless there is an open SASL ACL so we are trying to give the correct perms |
| if (!isZkAuthenticationConfiguredTopology(conf)) { |
| return null; |
| } |
| String stormZKUser = (String)conf.get(Config.STORM_ZOOKEEPER_SUPERACL); |
| if (stormZKUser == null) { |
| throw new IllegalArgumentException("Authentication is enabled but "+Config.STORM_ZOOKEEPER_SUPERACL+" is not set"); |
| } |
| String[] split = stormZKUser.split(":",2); |
| if (split.length != 2) { |
| throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL+" does not appear to be in the form scheme:acl, i.e. sasl:storm-user"); |
| } |
| ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL); |
| ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1]))); |
| return ret; |
| } |
| |
| public static String threadDump() { |
| final StringBuilder dump = new StringBuilder(); |
| final java.lang.management.ThreadMXBean threadMXBean = java.lang.management.ManagementFactory.getThreadMXBean(); |
| final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100); |
| for (java.lang.management.ThreadInfo threadInfo : threadInfos) { |
| dump.append('"'); |
| dump.append(threadInfo.getThreadName()); |
| dump.append("\" "); |
| final Thread.State state = threadInfo.getThreadState(); |
| dump.append("\n java.lang.Thread.State: "); |
| dump.append(state); |
| final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace(); |
| for (final StackTraceElement stackTraceElement : stackTraceElements) { |
| dump.append("\n at "); |
| dump.append(stackTraceElement); |
| } |
| dump.append("\n\n"); |
| } |
| return dump.toString(); |
| } |
| |
| // Assumes caller is synchronizing |
| private static SerializationDelegate getSerializationDelegate(Map stormConf) { |
| String delegateClassName = (String)stormConf.get(Config.STORM_META_SERIALIZATION_DELEGATE); |
| SerializationDelegate delegate; |
| try { |
| Class delegateClass = Class.forName(delegateClassName); |
| delegate = (SerializationDelegate) delegateClass.newInstance(); |
| } catch (ClassNotFoundException e) { |
| LOG.error("Failed to construct serialization delegate, falling back to default", e); |
| delegate = new DefaultSerializationDelegate(); |
| } catch (InstantiationException e) { |
| LOG.error("Failed to construct serialization delegate, falling back to default", e); |
| delegate = new DefaultSerializationDelegate(); |
| } catch (IllegalAccessException e) { |
| LOG.error("Failed to construct serialization delegate, falling back to default", e); |
| delegate = new DefaultSerializationDelegate(); |
| } |
| delegate.prepare(stormConf); |
| return delegate; |
| } |
| |
| public static void handleUncaughtException(Throwable t) { |
| if (t != null && t instanceof Error) { |
| if (t instanceof OutOfMemoryError) { |
| try { |
| System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName()); |
| } catch (Throwable err) { |
| //Again we don't want to exit because of logging issues. |
| } |
| Runtime.getRuntime().halt(-1); |
| } else { |
| //Running in daemon mode, we would pass Error to calling thread. |
| throw (Error) t; |
| } |
| } |
| } |
| |
| /** |
| * Given a File input it will unzip the file in a the unzip directory |
| * passed as the second parameter |
| * @param inFile The zip file as input |
| * @param unzipDir The unzip directory where to unzip the zip file. |
| * @throws IOException |
| */ |
| public static void unZip(File inFile, File unzipDir) throws IOException { |
| Enumeration<? extends ZipEntry> entries; |
| ZipFile zipFile = new ZipFile(inFile); |
| |
| try { |
| entries = zipFile.entries(); |
| while (entries.hasMoreElements()) { |
| ZipEntry entry = entries.nextElement(); |
| if (!entry.isDirectory()) { |
| InputStream in = zipFile.getInputStream(entry); |
| try { |
| File file = new File(unzipDir, entry.getName()); |
| if (!file.getParentFile().mkdirs()) { |
| if (!file.getParentFile().isDirectory()) { |
| throw new IOException("Mkdirs failed to create " + |
| file.getParentFile().toString()); |
| } |
| } |
| OutputStream out = new FileOutputStream(file); |
| try { |
| byte[] buffer = new byte[8192]; |
| int i; |
| while ((i = in.read(buffer)) != -1) { |
| out.write(buffer, 0, i); |
| } |
| } finally { |
| out.close(); |
| } |
| } finally { |
| in.close(); |
| } |
| } |
| } |
| } finally { |
| zipFile.close(); |
| } |
| } |
| |
| /** |
| * Given a zip File input it will return its size |
| * Only works for zip files whose uncompressed size is less than 4 GB, |
| * otherwise returns the size module 2^32, per gzip specifications |
| * @param myFile The zip file as input |
| * @throws IOException |
| * @return zip file size as a long |
| */ |
| public static long zipFileSize(File myFile) throws IOException{ |
| RandomAccessFile raf = new RandomAccessFile(myFile, "r"); |
| raf.seek(raf.length() - 4); |
| long b4 = raf.read(); |
| long b3 = raf.read(); |
| long b2 = raf.read(); |
| long b1 = raf.read(); |
| long val = (b1 << 24) | (b2 << 16) + (b3 << 8) + b4; |
| raf.close(); |
| return val; |
| } |
| |
| public static double zeroIfNaNOrInf(double x) { |
| return (Double.isNaN(x) || Double.isInfinite(x)) ? 0.0 : x; |
| } |
| |
| /** |
| * parses the arguments to extract jvm heap memory size in MB. |
| * @param input |
| * @param defaultValue |
| * @return the value of the JVM heap memory setting (in MB) in a java command. |
| */ |
| public static Double parseJvmHeapMemByChildOpts(String input, Double defaultValue) { |
| if (input != null) { |
| Pattern optsPattern = Pattern.compile("Xmx[0-9]+[mkgMKG]"); |
| Matcher m = optsPattern.matcher(input); |
| String memoryOpts = null; |
| while (m.find()) { |
| memoryOpts = m.group(); |
| } |
| if (memoryOpts != null) { |
| int unit = 1; |
| if (memoryOpts.toLowerCase().endsWith("k")) { |
| unit = 1024; |
| } else if (memoryOpts.toLowerCase().endsWith("m")) { |
| unit = 1024 * 1024; |
| } else if (memoryOpts.toLowerCase().endsWith("g")) { |
| unit = 1024 * 1024 * 1024; |
| } |
| memoryOpts = memoryOpts.replaceAll("[a-zA-Z]", ""); |
| Double result = Double.parseDouble(memoryOpts) * unit / 1024.0 / 1024.0; |
| return (result < 1.0) ? 1.0 : result; |
| } else { |
| return defaultValue; |
| } |
| } else { |
| return defaultValue; |
| } |
| } |
| } |
| |