blob: 9ab4f986bb2380a5ae0ec895b199be66e3c6dc1e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.utils;
import clojure.lang.Keyword;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.ClassLoaderObjectInputStream;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient;
import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;
import org.apache.curator.ensemble.exhibitor.Exhibitors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.storm.Config;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.BlobStoreAclHandler;
import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.blobstore.LocalFsBlobStore;
import org.apache.storm.daemon.JarTransformer;
import org.apache.storm.generated.AccessControl;
import org.apache.storm.generated.AccessControlType;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.ClusterSummary;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.ComponentObject;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.ReadableBlobMeta;
import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.generated.TopologySummary;
import org.apache.storm.localizer.Localizer;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.serialization.DefaultSerializationDelegate;
import org.apache.storm.serialization.SerializationDelegate;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.json.simple.JSONValue;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.SafeConstructor;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.URL;
import java.net.URLDecoder;
import java.net.UnknownHostException;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.PosixFilePermission;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import org.apache.storm.generated.InvalidTopologyException;
import clojure.lang.RT;
public class Utils {
// A singleton instance allows us to mock delegated static methods in our
// tests by subclassing.
private static Utils _instance = new Utils();
/**
* Provide an instance of this class for delegates to use. To mock out
* delegated methods, provide an instance of a subclass that overrides the
* implementation of the delegated method.
* @param u a Utils instance
* @return the previously set instance
*/
public static Utils setInstance(Utils u) {
Utils oldInstance = _instance;
_instance = u;
return oldInstance;
}
public static final Logger LOG = LoggerFactory.getLogger(Utils.class);
public static final String DEFAULT_STREAM_ID = "default";
public static final String DEFAULT_BLOB_VERSION_SUFFIX = ".version";
public static final String CURRENT_BLOB_SUFFIX_ID = "current";
public static final String DEFAULT_CURRENT_BLOB_SUFFIX = "." + CURRENT_BLOB_SUFFIX_ID;
private static ThreadLocal<TSerializer> threadSer = new ThreadLocal<TSerializer>();
private static ThreadLocal<TDeserializer> threadDes = new ThreadLocal<TDeserializer>();
private static SerializationDelegate serializationDelegate;
private static ClassLoader cl = null;
private static Map<String, Object> localConf;
public static final boolean IS_ON_WINDOWS = "Windows_NT".equals(System.getenv("OS"));
public static final String FILE_PATH_SEPARATOR = System.getProperty("file.separator");
public static final String CLASS_PATH_SEPARATOR = System.getProperty("path.separator");
public static final int SIGKILL = 9;
public static final int SIGTERM = 15;
static {
localConf = readStormConfig();
serializationDelegate = getSerializationDelegate(localConf);
}
@SuppressWarnings("unchecked")
public static <T> T newInstance(String klass) {
try {
return newInstance((Class<T>)Class.forName(klass));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static <T> T newInstance(Class<T> klass) {
return _instance.newInstanceImpl(klass);
}
// Non-static impl methods exist for mocking purposes.
public <T> T newInstanceImpl(Class<T> klass) {
try {
return klass.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static JarTransformer jarTransformer(String klass) {
JarTransformer ret = null;
if (klass != null) {
ret = (JarTransformer)newInstance(klass);
}
return ret;
}
public static byte[] serialize(Object obj) {
return serializationDelegate.serialize(obj);
}
public static <T> T deserialize(byte[] serialized, Class<T> clazz) {
return serializationDelegate.deserialize(serialized, clazz);
}
public static <T> T thriftDeserialize(Class<T> c, byte[] b, int offset, int length) {
try {
T ret = c.newInstance();
TDeserializer des = getDes();
des.deserialize((TBase) ret, b, offset, length);
return ret;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static byte[] javaSerialize(Object obj) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
oos.close();
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static <T> T javaDeserialize(byte[] serialized, Class<T> clazz) {
try {
ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
ObjectInputStream ois = null;
if (null == cl) {
ois = new ObjectInputStream(bis);
} else {
// Use custom class loader set in testing environment
ois = new ClassLoaderObjectInputStream(cl, bis);
}
Object ret = ois.readObject();
ois.close();
return (T)ret;
} catch (IOException ioe) {
throw new RuntimeException(ioe);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
public static byte[] gzip(byte[] data) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream out = new GZIPOutputStream(bos);
out.write(data);
out.close();
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static byte[] gunzip(byte[] data) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ByteArrayInputStream bis = new ByteArrayInputStream(data);
GZIPInputStream in = new GZIPInputStream(bis);
byte[] buffer = new byte[1024];
int len = 0;
while ((len = in.read(buffer)) >= 0) {
bos.write(buffer, 0, len);
}
in.close();
bos.close();
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static byte[] toCompressedJsonConf(Map<String, Object> stormConf) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
OutputStreamWriter out = new OutputStreamWriter(new GZIPOutputStream(bos));
JSONValue.writeJSONString(stormConf, out);
out.close();
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static Map<String, Object> fromCompressedJsonConf(byte[] serialized) {
try {
ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
InputStreamReader in = new InputStreamReader(new GZIPInputStream(bis));
Object ret = JSONValue.parseWithException(in);
in.close();
return (Map<String,Object>)ret;
} catch (IOException | ParseException e) {
throw new RuntimeException(e);
}
}
public static <T> String join(Iterable<T> coll, String sep) {
Iterator<T> it = coll.iterator();
StringBuilder ret = new StringBuilder();
while(it.hasNext()) {
ret.append(it.next());
if(it.hasNext()) {
ret.append(sep);
}
}
return ret.toString();
}
public static long bitXorVals(List<Long> coll) {
long result = 0;
for (Long val : coll) {
result ^= val;
}
return result;
}
public static void sleep(long millis) {
try {
Time.sleep(millis);
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
public static List<URL> findResources(String name) {
try {
Enumeration<URL> resources = Thread.currentThread().getContextClassLoader().getResources(name);
List<URL> ret = new ArrayList<URL>();
while (resources.hasMoreElements()) {
ret.add(resources.nextElement());
}
return ret;
} catch(IOException e) {
throw new RuntimeException(e);
}
}
public static Map<String, Object> findAndReadConfigFile(String name, boolean mustExist) {
InputStream in = null;
boolean confFileEmpty = false;
try {
in = getConfigFileInputStream(name);
if (null != in) {
Yaml yaml = new Yaml(new SafeConstructor());
@SuppressWarnings("unchecked")
Map<String, Object> ret = (Map<String, Object>) yaml.load(new InputStreamReader(in));
if (null != ret) {
return new HashMap<>(ret);
} else {
confFileEmpty = true;
}
}
if (mustExist) {
if(confFileEmpty)
throw new RuntimeException("Config file " + name + " doesn't have any valid storm configs");
else
throw new RuntimeException("Could not find config file on classpath " + name);
} else {
return new HashMap<>();
}
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
if (null != in) {
try {
in.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
private static InputStream getConfigFileInputStream(String configFilePath)
throws IOException {
if (null == configFilePath) {
throw new IOException(
"Could not find config file, name not specified");
}
HashSet<URL> resources = new HashSet<URL>(findResources(configFilePath));
if (resources.isEmpty()) {
File configFile = new File(configFilePath);
if (configFile.exists()) {
return new FileInputStream(configFile);
}
} else if (resources.size() > 1) {
throw new IOException(
"Found multiple " + configFilePath
+ " resources. You're probably bundling the Storm jars with your topology jar. "
+ resources);
} else {
LOG.debug("Using "+configFilePath+" from resources");
URL resource = resources.iterator().next();
return resource.openStream();
}
return null;
}
public static Map<String, Object> findAndReadConfigFile(String name) {
return findAndReadConfigFile(name, true);
}
public static Map<String, Object> readDefaultConfig() {
return findAndReadConfigFile("defaults.yaml", true);
}
public static Map<String, Object> readCommandLineOpts() {
Map<String, Object> ret = new HashMap<>();
String commandOptions = System.getProperty("storm.options");
if (commandOptions != null) {
/*
Below regex uses negative lookahead to not split in the middle of json objects '{}'
or json arrays '[]'. This is needed to parse valid json object/arrays passed as options
via 'storm.cmd' in windows. This is not an issue while using 'storm.py' since it url-encodes
the options and the below regex just does a split on the commas that separates each option.
Note:- This regex handles only valid json strings and could produce invalid results
if the options contain un-encoded invalid json or strings with unmatched '[, ], { or }'. We can
replace below code with split(",") once 'storm.cmd' is fixed to send url-encoded options.
*/
String[] configs = commandOptions.split(",(?![^\\[\\]{}]*(]|}))");
for (String config : configs) {
config = URLDecoder.decode(config);
String[] options = config.split("=", 2);
if (options.length == 2) {
Object val = options[1];
try {
val = JSONValue.parseWithException(options[1]);
} catch (ParseException ignored) {
//fall back to string, which is already set
}
ret.put(options[0], val);
}
}
}
return ret;
}
public static Map<String, Object> readStormConfig() {
Map<String, Object> ret = readDefaultConfig();
String confFile = System.getProperty("storm.conf.file");
Map<String, Object> storm;
if (confFile == null || confFile.equals("")) {
storm = findAndReadConfigFile("storm.yaml", false);
} else {
storm = findAndReadConfigFile(confFile, true);
}
ret.putAll(storm);
ret.putAll(readCommandLineOpts());
return ret;
}
private static Object normalizeConf(Object conf) {
if (conf == null) return new HashMap();
if (conf instanceof Map) {
Map<Object, Object> confMap = new HashMap((Map) conf);
for (Map.Entry<Object, Object> entry : confMap.entrySet()) {
confMap.put(entry.getKey(), normalizeConf(entry.getValue()));
}
return confMap;
} else if (conf instanceof List) {
List confList = new ArrayList((List) conf);
for (int i = 0; i < confList.size(); i++) {
Object val = confList.get(i);
confList.set(i, normalizeConf(val));
}
return confList;
} else if (conf instanceof Integer) {
return ((Integer) conf).longValue();
} else if (conf instanceof Float) {
return ((Float) conf).doubleValue();
} else {
return conf;
}
}
public static boolean isValidConf(Map<String, Object> stormConf) {
return normalizeConf(stormConf).equals(normalizeConf((Map) JSONValue.parse(JSONValue.toJSONString(stormConf))));
}
public static Object getSetComponentObject(ComponentObject obj) {
if (obj.getSetField() == ComponentObject._Fields.SERIALIZED_JAVA) {
return Utils.javaDeserialize(obj.get_serialized_java(), Serializable.class);
} else if (obj.getSetField() == ComponentObject._Fields.JAVA_OBJECT) {
return obj.get_java_object();
} else {
return obj.get_shell();
}
}
public static <S, T> T get(Map<S, T> m, S key, T def) {
T ret = m.get(key);
if (ret == null) {
ret = def;
}
return ret;
}
public static List<Object> tuple(Object... values) {
List<Object> ret = new ArrayList<Object>();
for (Object v : values) {
ret.add(v);
}
return ret;
}
public static Localizer createLocalizer(Map conf, String baseDir) {
return new Localizer(conf, baseDir);
}
public static ClientBlobStore getClientBlobStoreForSupervisor(Map conf) {
ClientBlobStore store = (ClientBlobStore) newInstance(
(String) conf.get(Config.SUPERVISOR_BLOBSTORE));
store.prepare(conf);
return store;
}
public static BlobStore getNimbusBlobStore(Map conf, NimbusInfo nimbusInfo) {
return getNimbusBlobStore(conf, null, nimbusInfo);
}
public static BlobStore getNimbusBlobStore(Map conf, String baseDir, NimbusInfo nimbusInfo) {
String type = (String)conf.get(Config.NIMBUS_BLOBSTORE);
if (type == null) {
type = LocalFsBlobStore.class.getName();
}
BlobStore store = (BlobStore) newInstance(type);
HashMap nconf = new HashMap(conf);
// only enable cleanup of blobstore on nimbus
nconf.put(Config.BLOBSTORE_CLEANUP_ENABLE, Boolean.TRUE);
if(store != null) {
// store can be null during testing when mocking utils.
store.prepare(nconf, baseDir, nimbusInfo);
}
return store;
}
/**
* Meant to be called only by the supervisor for stormjar/stormconf/stormcode files.
* @param key
* @param localFile
* @param cb
* @throws AuthorizationException
* @throws KeyNotFoundException
* @throws IOException
*/
public static void downloadResourcesAsSupervisor(String key, String localFile,
ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException, IOException {
_instance.downloadResourcesAsSupervisorImpl(key, localFile, cb);
}
public void downloadResourcesAsSupervisorImpl(String key, String localFile,
ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException, IOException {
final int MAX_RETRY_ATTEMPTS = 2;
final int ATTEMPTS_INTERVAL_TIME = 100;
for (int retryAttempts = 0; retryAttempts < MAX_RETRY_ATTEMPTS; retryAttempts++) {
if (downloadResourcesAsSupervisorAttempt(cb, key, localFile)) {
break;
}
Utils.sleep(ATTEMPTS_INTERVAL_TIME);
}
}
public static ClientBlobStore getClientBlobStore(Map conf) {
ClientBlobStore store = (ClientBlobStore) Utils.newInstance((String) conf.get(Config.CLIENT_BLOBSTORE));
store.prepare(conf);
return store;
}
private static boolean downloadResourcesAsSupervisorAttempt(ClientBlobStore cb, String key, String localFile) {
boolean isSuccess = false;
try (FileOutputStream out = new FileOutputStream(localFile);
InputStreamWithMeta in = cb.getBlob(key);) {
long fileSize = in.getFileLength();
byte[] buffer = new byte[1024];
int len;
int downloadFileSize = 0;
while ((len = in.read(buffer)) >= 0) {
out.write(buffer, 0, len);
downloadFileSize += len;
}
isSuccess = (fileSize == downloadFileSize);
} catch (TException | IOException e) {
LOG.error("An exception happened while downloading {} from blob store.", localFile, e);
}
if (!isSuccess) {
try {
Files.deleteIfExists(Paths.get(localFile));
} catch (IOException ex) {
LOG.error("Failed trying to delete the partially downloaded {}", localFile, ex);
}
}
return isSuccess;
}
public static boolean checkFileExists(File path) {
return Files.exists(path.toPath());
}
public static boolean checkFileExists(String path) {
return Files.exists(new File(path).toPath());
}
public static boolean checkFileExists(String dir, String file) {
return checkFileExists(dir + FILE_PATH_SEPARATOR + file);
}
public static boolean CheckDirExists(String dir) {
File file = new File(dir);
return file.isDirectory();
}
public static long nimbusVersionOfBlob(String key, ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException {
long nimbusBlobVersion = 0;
ReadableBlobMeta metadata = cb.getBlobMeta(key);
nimbusBlobVersion = metadata.get_version();
return nimbusBlobVersion;
}
public static String getFileOwner(String path) throws IOException {
return Files.getOwner(FileSystems.getDefault().getPath(path)).getName();
}
public static long localVersionOfBlob(String localFile) {
File f = new File(localFile + DEFAULT_BLOB_VERSION_SUFFIX);
long currentVersion = 0;
if (f.exists() && !(f.isDirectory())) {
BufferedReader br = null;
try {
br = new BufferedReader(new FileReader(f));
String line = br.readLine();
currentVersion = Long.parseLong(line);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
if (br != null) {
br.close();
}
} catch (Exception ignore) {
LOG.error("Exception trying to cleanup", ignore);
}
}
return currentVersion;
} else {
return -1;
}
}
public static String constructBlobWithVersionFileName(String fileName, long version) {
return fileName + "." + version;
}
public static String constructBlobCurrentSymlinkName(String fileName) {
return fileName + Utils.DEFAULT_CURRENT_BLOB_SUFFIX;
}
public static String constructVersionFileName(String fileName) {
return fileName + Utils.DEFAULT_BLOB_VERSION_SUFFIX;
}
// only works on operating systems that support posix
public static void restrictPermissions(String baseDir) {
try {
Set<PosixFilePermission> perms = new HashSet<PosixFilePermission>(
Arrays.asList(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE,
PosixFilePermission.OWNER_EXECUTE, PosixFilePermission.GROUP_READ,
PosixFilePermission.GROUP_EXECUTE));
Files.setPosixFilePermissions(FileSystems.getDefault().getPath(baseDir), perms);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static synchronized clojure.lang.IFn loadClojureFn(String namespace, String name) {
try {
clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")"));
} catch (Exception e) {
//if playing from the repl and defining functions, file won't exist
}
return (clojure.lang.IFn) RT.var(namespace, name).deref();
}
public static boolean isSystemId(String id) {
return id.startsWith("__");
}
public static ComponentCommon getComponentCommon(StormTopology topology, String id) {
if (topology.get_spouts().containsKey(id)) {
return topology.get_spouts().get(id).get_common();
}
if (topology.get_bolts().containsKey(id)) {
return topology.get_bolts().get(id).get_common();
}
if (topology.get_state_spouts().containsKey(id)) {
return topology.get_state_spouts().get(id).get_common();
}
throw new IllegalArgumentException("Could not find component with id " + id);
}
public static List<String> getStrings(final Object o) {
if (o == null) {
return new ArrayList<String>();
} else if (o instanceof String) {
return new ArrayList<String>() {{ add((String) o); }};
} else if (o instanceof Collection) {
List<String> answer = new ArrayList<String>();
for (Object v : (Collection) o) {
answer.add(v.toString());
}
return answer;
} else {
throw new IllegalArgumentException("Don't know how to convert to string list");
}
}
public static String getString(Object o) {
if (null == o) {
throw new IllegalArgumentException("Don't know how to convert null to String");
}
return o.toString();
}
public static Integer getInt(Object o) {
Integer result = getInt(o, null);
if (null == result) {
throw new IllegalArgumentException("Don't know how to convert null to int");
}
return result;
}
private static TDeserializer getDes() {
TDeserializer des = threadDes.get();
if(des == null) {
des = new TDeserializer();
threadDes.set(des);
}
return des;
}
public static byte[] thriftSerialize(TBase t) {
try {
TSerializer ser = threadSer.get();
if (ser == null) {
ser = new TSerializer();
threadSer.set(ser);
}
return ser.serialize(t);
} catch (TException e) {
LOG.error("Failed to serialize to thrift: ", e);
throw new RuntimeException(e);
}
}
public static <T> T thriftDeserialize(Class<T> c, byte[] b) {
try {
return Utils.thriftDeserialize(c, b, 0, b.length);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static Integer getInt(Object o, Integer defaultValue) {
if (null == o) {
return defaultValue;
}
if (o instanceof Integer ||
o instanceof Short ||
o instanceof Byte) {
return ((Number) o).intValue();
} else if (o instanceof Long) {
final long l = (Long) o;
if (l <= Integer.MAX_VALUE && l >= Integer.MIN_VALUE) {
return (int) l;
}
} else if (o instanceof String) {
return Integer.parseInt((String) o);
}
throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
}
public static Double getDouble(Object o) {
Double result = getDouble(o, null);
if (null == result) {
throw new IllegalArgumentException("Don't know how to convert null to double");
}
return result;
}
public static Double getDouble(Object o, Double defaultValue) {
if (null == o) {
return defaultValue;
}
if (o instanceof Number) {
return ((Number) o).doubleValue();
} else {
throw new IllegalArgumentException("Don't know how to convert " + o + " + to double");
}
}
public static boolean getBoolean(Object o, boolean defaultValue) {
if (null == o) {
return defaultValue;
}
if (o instanceof Boolean) {
return (Boolean) o;
} else {
throw new IllegalArgumentException("Don't know how to convert " + o + " + to boolean");
}
}
public static String getString(Object o, String defaultValue) {
if (null == o) {
return defaultValue;
}
if (o instanceof String) {
return (String) o;
} else {
throw new IllegalArgumentException("Don't know how to convert " + o + " + to String");
}
}
public static long secureRandomLong() {
return UUID.randomUUID().getLeastSignificantBits();
}
/**
* Unpack matching files from a jar. Entries inside the jar that do
* not match the given pattern will be skipped.
*
* @param jarFile the .jar file to unpack
* @param toDir the destination directory into which to unpack the jar
*/
public static void unJar(File jarFile, File toDir)
throws IOException {
JarFile jar = new JarFile(jarFile);
try {
Enumeration<JarEntry> entries = jar.entries();
while (entries.hasMoreElements()) {
final JarEntry entry = entries.nextElement();
if (!entry.isDirectory()) {
InputStream in = jar.getInputStream(entry);
try {
File file = new File(toDir, entry.getName());
ensureDirectory(file.getParentFile());
OutputStream out = new FileOutputStream(file);
try {
copyBytes(in, out, 8192);
} finally {
out.close();
}
} finally {
in.close();
}
}
}
} finally {
jar.close();
}
}
/**
* Copies from one stream to another.
*
* @param in InputStream to read from
* @param out OutputStream to write to
* @param buffSize the size of the buffer
*/
public static void copyBytes(InputStream in, OutputStream out, int buffSize)
throws IOException {
PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
byte buf[] = new byte[buffSize];
int bytesRead = in.read(buf);
while (bytesRead >= 0) {
out.write(buf, 0, bytesRead);
if ((ps != null) && ps.checkError()) {
throw new IOException("Unable to write to output stream.");
}
bytesRead = in.read(buf);
}
}
/**
* Ensure the existence of a given directory.
*
* @throws IOException if it cannot be created and does not already exist
*/
private static void ensureDirectory(File dir) throws IOException {
if (!dir.mkdirs() && !dir.isDirectory()) {
throw new IOException("Mkdirs failed to create " +
dir.toString());
}
}
/**
* Given a Tar File as input it will untar the file in a the untar directory
* passed as the second parameter
* <p/>
* This utility will untar ".tar" files and ".tar.gz","tgz" files.
*
* @param inFile The tar file as input.
* @param untarDir The untar directory where to untar the tar file.
* @throws IOException
*/
public static void unTar(File inFile, File untarDir) throws IOException {
if (!untarDir.mkdirs()) {
if (!untarDir.isDirectory()) {
throw new IOException("Mkdirs failed to create " + untarDir);
}
}
boolean gzipped = inFile.toString().endsWith("gz");
if (isOnWindows()) {
// Tar is not native to Windows. Use simple Java based implementation for
// tests and simple tar archives
unTarUsingJava(inFile, untarDir, gzipped);
} else {
// spawn tar utility to untar archive for full fledged unix behavior such
// as resolving symlinks in tar archives
unTarUsingTar(inFile, untarDir, gzipped);
}
}
private static void unTarUsingTar(File inFile, File untarDir,
boolean gzipped) throws IOException {
StringBuffer untarCommand = new StringBuffer();
if (gzipped) {
untarCommand.append(" gzip -dc '");
untarCommand.append(inFile.toString());
untarCommand.append("' | (");
}
untarCommand.append("cd '");
untarCommand.append(untarDir.toString());
untarCommand.append("' ; ");
untarCommand.append("tar -xf ");
if (gzipped) {
untarCommand.append(" -)");
} else {
untarCommand.append(inFile.toString());
}
String[] shellCmd = {"bash", "-c", untarCommand.toString()};
ShellUtils.ShellCommandExecutor shexec = new ShellUtils.ShellCommandExecutor(shellCmd);
shexec.execute();
int exitcode = shexec.getExitCode();
if (exitcode != 0) {
throw new IOException("Error untarring file " + inFile +
". Tar process exited with exit code " + exitcode);
}
}
private static void unTarUsingJava(File inFile, File untarDir,
boolean gzipped) throws IOException {
InputStream inputStream = null;
try {
if (gzipped) {
inputStream = new BufferedInputStream(new GZIPInputStream(
new FileInputStream(inFile)));
} else {
inputStream = new BufferedInputStream(new FileInputStream(inFile));
}
try (TarArchiveInputStream tis = new TarArchiveInputStream(inputStream)) {
for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null; ) {
unpackEntries(tis, entry, untarDir);
entry = tis.getNextTarEntry();
}
}
} finally {
if(inputStream != null) {
inputStream.close();
}
}
}
private static void unpackEntries(TarArchiveInputStream tis,
TarArchiveEntry entry, File outputDir) throws IOException {
if (entry.isDirectory()) {
File subDir = new File(outputDir, entry.getName());
if (!subDir.mkdirs() && !subDir.isDirectory()) {
throw new IOException("Mkdirs failed to create tar internal dir "
+ outputDir);
}
for (TarArchiveEntry e : entry.getDirectoryEntries()) {
unpackEntries(tis, e, subDir);
}
return;
}
File outputFile = new File(outputDir, entry.getName());
if (!outputFile.getParentFile().exists()) {
if (!outputFile.getParentFile().mkdirs()) {
throw new IOException("Mkdirs failed to create tar internal dir "
+ outputDir);
}
}
int count;
byte data[] = new byte[2048];
BufferedOutputStream outputStream = new BufferedOutputStream(
new FileOutputStream(outputFile));
while ((count = tis.read(data)) != -1) {
outputStream.write(data, 0, count);
}
outputStream.flush();
outputStream.close();
}
public static boolean isOnWindows() {
if (System.getenv("OS") != null) {
return System.getenv("OS").equals("Windows_NT");
}
return false;
}
public static boolean isAbsolutePath(String path) {
return Paths.get(path).isAbsolute();
}
public static void unpack(File localrsrc, File dst) throws IOException {
String lowerDst = localrsrc.getName().toLowerCase();
if (lowerDst.endsWith(".jar")) {
unJar(localrsrc, dst);
} else if (lowerDst.endsWith(".zip")) {
unZip(localrsrc, dst);
} else if (lowerDst.endsWith(".tar.gz") ||
lowerDst.endsWith(".tgz") ||
lowerDst.endsWith(".tar")) {
unTar(localrsrc, dst);
} else {
LOG.warn("Cannot unpack " + localrsrc);
if (!localrsrc.renameTo(dst)) {
throw new IOException("Unable to rename file: [" + localrsrc
+ "] to [" + dst + "]");
}
}
if (localrsrc.isFile()) {
localrsrc.delete();
}
}
public static boolean canUserReadBlob(ReadableBlobMeta meta, String user) {
SettableBlobMeta settable = meta.get_settable();
for (AccessControl acl : settable.get_acl()) {
if (acl.get_type().equals(AccessControlType.OTHER) && (acl.get_access() & BlobStoreAclHandler.READ) > 0) {
return true;
}
if (acl.get_name().equals(user) && (acl.get_access() & BlobStoreAclHandler.READ) > 0) {
return true;
}
}
return false;
}
public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root) {
return newCurator(conf, servers, port, root, null);
}
public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) {
return newCurator(conf, servers, port, "", auth);
}
public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
List<String> serverPorts = new ArrayList<String>();
for (String zkServer : servers) {
serverPorts.add(zkServer + ":" + Utils.getInt(port));
}
String zkStr = StringUtils.join(serverPorts, ",") + root;
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
setupBuilder(builder, zkStr, conf, auth);
return builder.build();
}
protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, final String zkStr, Map conf, ZookeeperAuthInfo auth)
{
List<String> exhibitorServers = getStrings(conf.get(Config.STORM_EXHIBITOR_SERVERS));
if (!exhibitorServers.isEmpty()) {
// use exhibitor servers
builder.ensembleProvider(new ExhibitorEnsembleProvider(
new Exhibitors(exhibitorServers, Utils.getInt(conf.get(Config.STORM_EXHIBITOR_PORT)),
new Exhibitors.BackupConnectionStringProvider() {
@Override
public String getBackupConnectionString() throws Exception {
// use zk servers as backup if they exist
return zkStr;
}}),
new DefaultExhibitorRestClient(),
Utils.getString(conf.get(Config.STORM_EXHIBITOR_URIPATH)),
Utils.getInt(conf.get(Config.STORM_EXHIBITOR_POLL)),
new StormBoundedExponentialBackoffRetry(
Utils.getInt(conf.get(Config.STORM_EXHIBITOR_RETRY_INTERVAL)),
Utils.getInt(conf.get(Config.STORM_EXHIBITOR_RETRY_INTERVAL_CEILING)),
Utils.getInt(conf.get(Config.STORM_EXHIBITOR_RETRY_TIMES)))));
} else {
builder.connectString(zkStr);
}
builder
.connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
.sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
.retryPolicy(new StormBoundedExponentialBackoffRetry(
Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)),
Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING)),
Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES))));
if (auth != null && auth.scheme != null && auth.payload != null) {
builder.authorization(auth.scheme, auth.payload);
}
}
public static void testSetupBuilder(CuratorFrameworkFactory.Builder
builder, String zkStr, Map conf, ZookeeperAuthInfo auth)
{
setupBuilder(builder, zkStr, conf, auth);
}
public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
CuratorFramework ret = newCurator(conf, servers, port, root, auth);
LOG.info("Starting Utils Curator...");
ret.start();
return ret;
}
public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) {
CuratorFramework ret = newCurator(conf, servers, port, auth);
LOG.info("Starting Utils Curator (2)...");
ret.start();
return ret;
}
public static TreeMap<Integer, Integer> integerDivided(int sum, int numPieces) {
int base = sum / numPieces;
int numInc = sum % numPieces;
int numBases = numPieces - numInc;
TreeMap<Integer, Integer> ret = new TreeMap<Integer, Integer>();
ret.put(base, numBases);
if (numInc != 0) {
ret.put(base+1, numInc);
}
return ret;
}
public static byte[] toByteArray(ByteBuffer buffer) {
byte[] ret = new byte[buffer.remaining()];
buffer.get(ret, 0, ret.length);
return ret;
}
public static void readAndLogStream(String prefix, InputStream in) {
try {
BufferedReader r = new BufferedReader(new InputStreamReader(in));
String line = null;
while ((line = r.readLine()) != null) {
LOG.info("{}:{}", prefix, line);
}
} catch (IOException e) {
LOG.warn("Error while trying to log stream", e);
}
}
/**
* Checks if a throwable is an instance of a particular class
* @param klass The class you're expecting
* @param throwable The throwable you expect to be an instance of klass
* @return true if throwable is instance of klass, false otherwise.
*/
public static boolean exceptionCauseIsInstanceOf(Class klass, Throwable throwable) {
Throwable t = throwable;
while (t != null) {
if (klass.isInstance(t)) {
return true;
}
t = t.getCause();
}
return false;
}
/**
* Is the cluster configured to interact with ZooKeeper in a secure way?
* This only works when called from within Nimbus or a Supervisor process.
* @param conf the storm configuration, not the topology configuration
* @return true if it is configured else false.
*/
public static boolean isZkAuthenticationConfiguredStormServer(Map conf) {
return null != System.getProperty("java.security.auth.login.config")
|| (conf != null
&& conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME) != null
&& !((String)conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME)).isEmpty());
}
/**
* Is the topology configured to have ZooKeeper authentication.
* @param conf the topology configuration
* @return true if ZK is configured else false
*/
public static boolean isZkAuthenticationConfiguredTopology(Map conf) {
return (conf != null
&& conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME) != null
&& !((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
}
public static List<ACL> getWorkerACL(Map conf) {
//This is a work around to an issue with ZK where a sasl super user is not super unless there is an open SASL ACL so we are trying to give the correct perms
if (!isZkAuthenticationConfiguredTopology(conf)) {
return null;
}
String stormZKUser = (String)conf.get(Config.STORM_ZOOKEEPER_SUPERACL);
if (stormZKUser == null) {
throw new IllegalArgumentException("Authentication is enabled but " + Config.STORM_ZOOKEEPER_SUPERACL + " is not set");
}
String[] split = stormZKUser.split(":", 2);
if (split.length != 2) {
throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL + " does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
}
ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL);
ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1])));
return ret;
}
/**
* Takes an input dir or file and returns the disk usage on that local directory.
* Very basic implementation.
*
* @param dir The input dir to get the disk space of this local dir
* @return The total disk space of the input local directory
*/
public static long getDU(File dir) {
long size = 0;
if (!dir.exists())
return 0;
if (!dir.isDirectory()) {
return dir.length();
} else {
File[] allFiles = dir.listFiles();
if(allFiles != null) {
for (int i = 0; i < allFiles.length; i++) {
boolean isSymLink;
try {
isSymLink = org.apache.commons.io.FileUtils.isSymlink(allFiles[i]);
} catch(IOException ioe) {
isSymLink = true;
}
if(!isSymLink) {
size += getDU(allFiles[i]);
}
}
}
return size;
}
}
/**
* Gets some information, including stack trace, for a running thread.
* @return A human-readable string of the dump.
*/
public static String threadDump() {
final StringBuilder dump = new StringBuilder();
final java.lang.management.ThreadMXBean threadMXBean = java.lang.management.ManagementFactory.getThreadMXBean();
final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
for (java.lang.management.ThreadInfo threadInfo : threadInfos) {
dump.append('"');
dump.append(threadInfo.getThreadName());
dump.append("\" ");
dump.append("\n lock: ");
dump.append(threadInfo.getLockName());
dump.append(" owner: ");
dump.append(threadInfo.getLockOwnerName());
final Thread.State state = threadInfo.getThreadState();
dump.append("\n java.lang.Thread.State: ");
dump.append(state);
final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
for (final StackTraceElement stackTraceElement : stackTraceElements) {
dump.append("\n at ");
dump.append(stackTraceElement);
}
dump.append("\n\n");
}
return dump.toString();
}
/**
* Creates an instance of the pluggable SerializationDelegate or falls back to
* DefaultSerializationDelegate if something goes wrong.
* @param stormConf The config from which to pull the name of the pluggable class.
* @return an instance of the class specified by storm.meta.serialization.delegate
*/
private static SerializationDelegate getSerializationDelegate(Map stormConf) {
String delegateClassName = (String)stormConf.get(Config.STORM_META_SERIALIZATION_DELEGATE);
SerializationDelegate delegate;
try {
Class delegateClass = Class.forName(delegateClassName);
delegate = (SerializationDelegate) delegateClass.newInstance();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
LOG.error("Failed to construct serialization delegate, falling back to default", e);
delegate = new DefaultSerializationDelegate();
}
delegate.prepare(stormConf);
return delegate;
}
public static void handleUncaughtException(Throwable t) {
if (t != null && t instanceof Error) {
if (t instanceof OutOfMemoryError) {
try {
System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName());
} catch (Throwable err) {
//Again we don't want to exit because of logging issues.
}
Runtime.getRuntime().halt(-1);
} else {
//Running in daemon mode, we would pass Error to calling thread.
throw (Error) t;
}
}
}
public static void validateTopologyBlobStoreMap(Map<String, ?> stormConf, Set<String> blobStoreKeys) throws InvalidTopologyException {
@SuppressWarnings("unchecked")
Map<String, Object> blobStoreMap = (Map<String, Object>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
if (blobStoreMap != null) {
Set<String> mapKeys = blobStoreMap.keySet();
Set<String> missingKeys = new HashSet<>();
for (String key : mapKeys) {
if (!blobStoreKeys.contains(key)) {
missingKeys.add(key);
}
}
if (!missingKeys.isEmpty()) {
throw new InvalidTopologyException("The topology blob store map does not " +
"contain the valid keys to launch the topology " + missingKeys);
}
}
}
/**
* Given a File input it will unzip the file in a the unzip directory
* passed as the second parameter
* @param inFile The zip file as input
* @param unzipDir The unzip directory where to unzip the zip file.
* @throws IOException
*/
public static void unZip(File inFile, File unzipDir) throws IOException {
Enumeration<? extends ZipEntry> entries;
ZipFile zipFile = new ZipFile(inFile);
try {
entries = zipFile.entries();
while (entries.hasMoreElements()) {
ZipEntry entry = entries.nextElement();
if (!entry.isDirectory()) {
InputStream in = zipFile.getInputStream(entry);
try {
File file = new File(unzipDir, entry.getName());
if (!file.getParentFile().mkdirs()) {
if (!file.getParentFile().isDirectory()) {
throw new IOException("Mkdirs failed to create " +
file.getParentFile().toString());
}
}
OutputStream out = new FileOutputStream(file);
try {
byte[] buffer = new byte[8192];
int i;
while ((i = in.read(buffer)) != -1) {
out.write(buffer, 0, i);
}
} finally {
out.close();
}
} finally {
in.close();
}
}
}
} finally {
zipFile.close();
}
}
/**
* Given a zip File input it will return its size
* Only works for zip files whose uncompressed size is less than 4 GB,
* otherwise returns the size module 2^32, per gzip specifications
* @param myFile The zip file as input
* @throws IOException
* @return zip file size as a long
*/
public static long zipFileSize(File myFile) throws IOException{
RandomAccessFile raf = new RandomAccessFile(myFile, "r");
raf.seek(raf.length() - 4);
long b4 = raf.read();
long b3 = raf.read();
long b2 = raf.read();
long b1 = raf.read();
long val = (b1 << 24) | (b2 << 16) + (b3 << 8) + b4;
raf.close();
return val;
}
public static double zeroIfNaNOrInf(double x) {
return (Double.isNaN(x) || Double.isInfinite(x)) ? 0.0 : x;
}
/**
* parses the arguments to extract jvm heap memory size in MB.
* @param input
* @param defaultValue
* @return the value of the JVM heap memory setting (in MB) in a java command.
*/
public static Double parseJvmHeapMemByChildOpts(List<String> options, Double defaultValue) {
if (options != null) {
Pattern optsPattern = Pattern.compile("Xmx([0-9]+)([mkgMKG])");
for (String option : options) {
if (option == null) {
continue;
}
Matcher m = optsPattern.matcher(option);
while (m.find()) {
int value = Integer.parseInt(m.group(1));
char unitChar = m.group(2).toLowerCase().charAt(0);
int unit;
switch (unitChar) {
case 'k':
unit = 1024;
break;
case 'm':
unit = 1024 * 1024;
break;
case 'g':
unit = 1024 * 1024 * 1024;
break;
default:
unit = 1;
}
Double result = value * unit / 1024.0 / 1024.0;
return (result < 1.0) ? 1.0 : result;
}
}
return defaultValue;
} else {
return defaultValue;
}
}
@VisibleForTesting
public static void setClassLoaderForJavaDeSerialize(ClassLoader cl) {
Utils.cl = cl;
}
@VisibleForTesting
public static void resetClassLoaderForJavaDeSerialize() {
Utils.cl = ClassLoader.getSystemClassLoader();
}
public static TopologyInfo getTopologyInfo(String name, String asUser, Map stormConf) {
try (NimbusClient client = NimbusClient.getConfiguredClientAs(stormConf, asUser)) {
String topologyId = getTopologyId(name, client.getClient());
if (null != topologyId) {
return client.getClient().getTopologyInfo(topologyId);
}
return null;
} catch(Exception e) {
throw new RuntimeException(e);
}
}
public static String getTopologyId(String name, Nimbus.Client client) {
try {
ClusterSummary summary = client.getClusterInfo();
for(TopologySummary s : summary.get_topologies()) {
if(s.get_name().equals(name)) {
return s.get_id();
}
}
} catch(Exception e) {
throw new RuntimeException(e);
}
return null;
}
/**
* A cheap way to deterministically convert a number to a positive value. When the input is
* positive, the original value is returned. When the input number is negative, the returned
* positive value is the original value bit AND against Integer.MAX_VALUE(0x7fffffff) which
* is not its absolutely value.
*
* @param number a given number
* @return a positive number.
*/
public static int toPositive(int number) {
return number & Integer.MAX_VALUE;
}
public static GlobalStreamId getGlobalStreamId(String streamId, String componentId) {
if (componentId == null) {
return new GlobalStreamId(streamId, DEFAULT_STREAM_ID);
}
return new GlobalStreamId(streamId, componentId);
}
public static RuntimeException wrapInRuntime(Exception e){
if (e instanceof RuntimeException){
return (RuntimeException)e;
} else {
return new RuntimeException(e);
}
}
public static int getAvailablePort(int prefferedPort) {
int localPort = -1;
try(ServerSocket socket = new ServerSocket(prefferedPort)) {
localPort = socket.getLocalPort();
} catch(IOException exp) {
if (prefferedPort > 0) {
return getAvailablePort(0);
}
}
return localPort;
}
public static int getAvailablePort() {
return getAvailablePort(0);
}
/**
* Determines if a zip archive contains a particular directory.
*
* @param zipfile path to the zipped file
* @param target directory being looked for in the zip.
* @return boolean whether or not the directory exists in the zip.
*/
public static boolean zipDoesContainDir(String zipfile, String target) throws IOException {
List<ZipEntry> entries = (List<ZipEntry>) Collections.list(new ZipFile(zipfile).entries());
String targetDir = target + "/";
for(ZipEntry entry : entries) {
String name = entry.getName();
if(name.startsWith(targetDir)) {
return true;
}
}
return false;
}
/**
* Joins any number of maps together into a single map, combining their values into
* a list, maintaining values in the order the maps were passed in. Nulls are inserted
* for given keys when the map does not contain that key.
*
* i.e. joinMaps({'a' => 1, 'b' => 2}, {'b' => 3}, {'a' => 4, 'c' => 5}) ->
* {'a' => [1, null, 4], 'b' => [2, 3, null], 'c' => [null, null, 5]}
*
* @param maps variable number of maps to join - order affects order of values in output.
* @return combined map
*/
public static <K, V> Map<K, List<V>> joinMaps(Map<K, V>... maps) {
Map<K, List<V>> ret = new HashMap<>();
Set<K> keys = new HashSet<>();
for(Map<K, V> map : maps) {
keys.addAll(map.keySet());
}
for(Map<K, V> m : maps) {
for(K key : keys) {
V value = m.get(key);
if(!ret.containsKey(key)) {
ret.put(key, new ArrayList<V>());
}
List<V> targetList = ret.get(key);
targetList.add(value);
}
}
return ret;
}
/**
* Fills up chunks out of a collection (given a maximum amount of chunks)
*
* i.e. partitionFixed(5, [1,2,3]) -> [[1,2,3]]
* partitionFixed(5, [1..9]) -> [[1,2], [3,4], [5,6], [7,8], [9]]
* partitionFixed(3, [1..10]) -> [[1,2,3,4], [5,6,7], [8,9,10]]
* @param maxNumChunks the maximum number of chunks to return
* @param coll the collection to be chunked up
* @return a list of the chunks, which are themselves lists.
*/
public static <T> List<List<T>> partitionFixed(int maxNumChunks, Collection<T> coll) {
List<List<T>> ret = new ArrayList<>();
if(maxNumChunks == 0 || coll == null) {
return ret;
}
Map<Integer, Integer> parts = integerDivided(coll.size(), maxNumChunks);
// Keys sorted in descending order
List<Integer> sortedKeys = new ArrayList<Integer>(parts.keySet());
Collections.sort(sortedKeys, Collections.reverseOrder());
Iterator<T> it = coll.iterator();
for(Integer chunkSize : sortedKeys) {
if(!it.hasNext()) { break; }
Integer times = parts.get(chunkSize);
for(int i = 0; i < times; i++) {
if(!it.hasNext()) { break; }
List<T> chunkList = new ArrayList<>();
for(int j = 0; j < chunkSize; j++) {
if(!it.hasNext()) { break; }
chunkList.add(it.next());
}
ret.add(chunkList);
}
}
return ret;
}
/**
* Return a new instance of a pluggable specified in the conf.
* @param conf The conf to read from.
* @param configKey The key pointing to the pluggable class
* @return an instance of the class or null if it is not specified.
*/
public static Object getConfiguredClass(Map conf, Object configKey) {
if (conf.containsKey(configKey)) {
return newInstance((String)conf.get(configKey));
}
return null;
}
public static String logsFilename(String stormId, String port) {
return stormId + FILE_PATH_SEPARATOR + port + FILE_PATH_SEPARATOR + "worker.log";
}
public static String eventLogsFilename(String stormId, String port) {
return stormId + FILE_PATH_SEPARATOR + port + FILE_PATH_SEPARATOR + "events.log";
}
public static Object readYamlFile(String yamlFile) {
try (FileReader reader = new FileReader(yamlFile)) {
return new Yaml(new SafeConstructor()).load(reader);
} catch(Exception ex) {
LOG.error("Failed to read yaml file.", ex);
}
return null;
}
public static void setupDefaultUncaughtExceptionHandler() {
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread thread, Throwable thrown) {
try {
handleUncaughtException(thrown);
} catch (Error err) {
LOG.error("Received error in main thread.. terminating server...", err);
Runtime.getRuntime().exit(-2);
}
}
});
}
/**
* Creates a new map with a string value in the map replaced with an
* equivalently-lengthed string of '#'.
* @param m The map that a value will be redacted from
* @param key The key pointing to the value to be redacted
* @return a new map with the value redacted. The original map will not be modified.
*/
public static Map<Object, String> redactValue(Map<Object, String> m, Object key) {
if(m.containsKey(key)) {
HashMap<Object, String> newMap = new HashMap<>(m);
String value = newMap.get(key);
String redacted = new String(new char[value.length()]).replace("\0", "#");
newMap.put(key, redacted);
return newMap;
}
return m;
}
/**
* Make sure a given key name is valid for the storm config.
* Throw RuntimeException if the key isn't valid.
* @param name The name of the config key to check.
*/
private static final Set<String> disallowedKeys = new HashSet<>(Arrays.asList(new String[] {"/", ".", ":", "\\"}));
public static void validateKeyName(String name) {
for(String key : disallowedKeys) {
if( name.contains(key) ) {
throw new RuntimeException("Key name cannot contain any of the following: " + disallowedKeys.toString());
}
}
if(name.trim().isEmpty()) {
throw new RuntimeException("Key name cannot be blank");
}
}
public static String localHostname () throws UnknownHostException {
return _instance.localHostnameImpl();
}
// Non-static impl methods exist for mocking purposes.
protected String localHostnameImpl () throws UnknownHostException {
return InetAddress.getLocalHost().getCanonicalHostName();
}
private static String memoizedLocalHostnameString = null;
public static String memoizedLocalHostname () throws UnknownHostException {
if (memoizedLocalHostnameString == null) {
memoizedLocalHostnameString = localHostname();
}
return memoizedLocalHostnameString;
}
/**
* Gets the storm.local.hostname value, or tries to figure out the local hostname
* if it is not set in the config.
* @return a string representation of the hostname.
*/
public static String hostname() throws UnknownHostException {
return _instance.hostnameImpl();
}
// Non-static impl methods exist for mocking purposes.
protected String hostnameImpl () throws UnknownHostException {
if (localConf == null) {
return memoizedLocalHostname();
}
Object hostnameString = localConf.get(Config.STORM_LOCAL_HOSTNAME);
if (hostnameString == null || hostnameString.equals("")) {
return memoizedLocalHostname();
}
return (String)hostnameString;
}
public static String uuid() {
return UUID.randomUUID().toString();
}
public static void exitProcess (int val, String msg) {
String combinedErrorMessage = "Halting process: " + msg;
LOG.error(combinedErrorMessage, new RuntimeException(combinedErrorMessage));
Runtime.getRuntime().exit(val);
}
public static Runnable mkSuicideFn() {
return new Runnable() {
@Override
public void run() {
Utils.exitProcess(1, "Worker died");
}
};
}
/**
* "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
*
* Example usage in java:
* Map<Integer, String> tasks;
* Map<String, List<Integer>> componentTasks = Utils.reverse_map(tasks);
*
* The order of he resulting list values depends on the ordering properties
* of the Map passed in. The caller is responsible for passing an ordered
* map if they expect the result to be consistently ordered as well.
*
* @param map to reverse
* @return a reversed map
*/
public static <K, V> HashMap<V, List<K>> reverseMap(Map<K, V> map) {
HashMap<V, List<K>> rtn = new HashMap<V, List<K>>();
if (map == null) {
return rtn;
}
for (Entry<K, V> entry : map.entrySet()) {
K key = entry.getKey();
V val = entry.getValue();
List<K> list = rtn.get(val);
if (list == null) {
list = new ArrayList<K>();
rtn.put(entry.getValue(), list);
}
list.add(key);
}
return rtn;
}
/**
* "[[:a 1] [:b 1] [:c 2]} -> {1 [:a :b] 2 :c}"
* Reverses an assoc-list style Map like reverseMap(Map...)
*
* @param listSeq to reverse
* @return a reversed map
*/
public static HashMap reverseMap(List listSeq) {
HashMap<Object, List<Object>> rtn = new HashMap();
if (listSeq == null) {
return rtn;
}
for (Object entry : listSeq) {
List listEntry = (List) entry;
Object key = listEntry.get(0);
Object val = listEntry.get(1);
List list = rtn.get(val);
if (list == null) {
list = new ArrayList<Object>();
rtn.put(val, list);
}
list.add(key);
}
return rtn;
}
/**
* @return the pid of this JVM, because Java doesn't provide a real way to do this.
*/
public static String processPid() {
String name = ManagementFactory.getRuntimeMXBean().getName();
String[] split = name.split("@");
if (split.length != 2) {
throw new RuntimeException("Got unexpected process name: " + name);
}
return split[0];
}
public static int execCommand(String... command) throws ExecuteException, IOException {
CommandLine cmd = new CommandLine(command[0]);
for (int i = 1; i < command.length; i++) {
cmd.addArgument(command[i]);
}
DefaultExecutor exec = new DefaultExecutor();
return exec.execute(cmd);
}
/**
* Extract dir from the jar to destdir
*
* @param jarpath Path to the jar file
* @param dir Directory in the jar to pull out
* @param destdir Path to the directory where the extracted directory will be put
*/
public static void extractDirFromJar(String jarpath, String dir, File destdir) {
_instance.extractDirFromJarImpl(jarpath, dir, destdir);
}
public void extractDirFromJarImpl(String jarpath, String dir, File destdir) {
try (JarFile jarFile = new JarFile(jarpath)) {
Enumeration<JarEntry> jarEnums = jarFile.entries();
while (jarEnums.hasMoreElements()) {
JarEntry entry = jarEnums.nextElement();
if (!entry.isDirectory() && entry.getName().startsWith(dir)) {
File aFile = new File(destdir, entry.getName());
aFile.getParentFile().mkdirs();
try (FileOutputStream out = new FileOutputStream(aFile);
InputStream in = jarFile.getInputStream(entry)) {
IOUtils.copy(in, out);
}
}
}
} catch (IOException e) {
LOG.info("Could not extract {} from {}", dir, jarpath);
}
}
public static void sendSignalToProcess(long lpid, int signum) throws IOException {
String pid = Long.toString(lpid);
try {
if (isOnWindows()) {
if (signum == SIGKILL) {
execCommand("taskkill", "/f", "/pid", pid);
} else {
execCommand("taskkill", "/pid", pid);
}
} else {
execCommand("kill", "-" + signum, pid);
}
} catch (ExecuteException e) {
LOG.info("Error when trying to kill {}. Process is probably already dead.", pid);
} catch (IOException e) {
LOG.info("IOException Error when trying to kill {}.", pid);
throw e;
}
}
public static void forceKillProcess (String pid) throws IOException {
sendSignalToProcess(Long.parseLong(pid), SIGKILL);
}
public static void killProcessWithSigTerm (String pid) throws IOException {
sendSignalToProcess(Long.parseLong(pid), SIGTERM);
}
/**
* Adds the user supplied function as a shutdown hook for cleanup.
* Also adds a function that sleeps for a second and then halts the
* runtime to avoid any zombie process in case cleanup function hangs.
*/
public static void addShutdownHookWithForceKillIn1Sec (Runnable func) {
Runnable sleepKill = new Runnable() {
@Override
public void run() {
try {
Time.sleepSecs(1);
Runtime.getRuntime().halt(20);
} catch (Exception e) {
LOG.warn("Exception in the ShutDownHook", e);
}
}
};
Runtime.getRuntime().addShutdownHook(new Thread(func));
Runtime.getRuntime().addShutdownHook(new Thread(sleepKill));
}
/**
* Returns the combined string, escaped for posix shell.
* @param command the list of strings to be combined
* @return the resulting command string
*/
public static String shellCmd (List<String> command) {
List<String> changedCommands = new ArrayList<>(command.size());
for (String str: command) {
if (str == null) {
continue;
}
changedCommands.add("'" + str.replaceAll("'", "'\"'\"'") + "'");
}
return StringUtils.join(changedCommands, " ");
}
public static String scriptFilePath (String dir) {
return dir + FILE_PATH_SEPARATOR + "storm-worker-script.sh";
}
public static String containerFilePath (String dir) {
return dir + FILE_PATH_SEPARATOR + "launch_container.sh";
}
public static Object nullToZero (Object v) {
return (v != null ? v : 0);
}
/**
* Deletes a file or directory and its contents if it exists. Does not
* complain if the input is null or does not exist.
* @param path the path to the file or directory
*/
public static void forceDelete(String path) throws IOException {
_instance.forceDeleteImpl(path);
}
// Non-static impl methods exist for mocking purposes.
protected void forceDeleteImpl(String path) throws IOException {
LOG.debug("Deleting path {}", path);
if (checkFileExists(path)) {
try {
FileUtils.forceDelete(new File(path));
} catch (FileNotFoundException ignored) {}
}
}
/**
* Returns a Collection of file names found under the given directory.
* @param dir a directory
* @return the Collection of file names
*/
public static Collection<String> readDirContents(String dir) {
Collection<String> ret = new HashSet<>();
File[] files = new File(dir).listFiles();
if (files != null) {
for (File f: files) {
ret.add(f.getName());
}
}
return ret;
}
/**
* Returns the value of java.class.path System property. Kept separate for
* testing.
* @return the classpath
*/
public static String currentClasspath() {
return _instance.currentClasspathImpl();
}
// Non-static impl methods exist for mocking purposes.
public String currentClasspathImpl() {
return System.getProperty("java.class.path");
}
public static String addToClasspath(String classpath,
Collection<String> paths) {
return _instance.addToClasspathImpl(classpath, paths);
}
public static String addToClasspath(Collection<String> classpaths,
Collection<String> paths) {
return _instance.addToClasspathImpl(classpaths, paths);
}
// Non-static impl methods exist for mocking purposes.
public String addToClasspathImpl(String classpath,
Collection<String> paths) {
if (paths == null || paths.isEmpty()) {
return classpath;
}
List<String> l = new LinkedList<>();
l.add(classpath);
l.addAll(paths);
return StringUtils.join(l, CLASS_PATH_SEPARATOR);
}
public String addToClasspathImpl(Collection<String> classpaths,
Collection<String> paths) {
List<String> allPaths = new ArrayList<>();
if(classpaths != null) {
allPaths.addAll(classpaths);
}
if(paths != null) {
allPaths.addAll(paths);
}
return StringUtils.join(allPaths, CLASS_PATH_SEPARATOR);
}
public static class UptimeComputer {
int startTime = 0;
public UptimeComputer() {
startTime = Time.currentTimeSecs();
}
public int upTime() {
return Time.deltaSecs(startTime);
}
}
public static UptimeComputer makeUptimeComputer() {
return _instance.makeUptimeComputerImpl();
}
// Non-static impl methods exist for mocking purposes.
public UptimeComputer makeUptimeComputerImpl() {
return new UptimeComputer();
}
/**
* a or b the first one that is not null
* @param a something
* @param b something else
* @return a or b the first one that is not null
*/
public static <V> V OR(V a, V b) {
return a == null ? b : a;
}
/**
* Writes a posix shell script file to be executed in its own process.
* @param dir the directory under which the script is to be written
* @param command the command the script is to execute
* @param environment optional environment variables to set before running the script's command. May be null.
* @return the path to the script that has been written
*/
public static String writeScript(String dir, List<String> command,
Map<String,String> environment) throws IOException {
String path = Utils.scriptFilePath(dir);
try(BufferedWriter out = new BufferedWriter(new FileWriter(path))) {
out.write("#!/bin/bash");
out.newLine();
if (environment != null) {
for (String k : environment.keySet()) {
String v = environment.get(k);
if (v == null) {
v = "";
}
out.write(Utils.shellCmd(
Arrays.asList(
"export",k+"="+v)));
out.write(";");
out.newLine();
}
}
out.newLine();
out.write("exec "+Utils.shellCmd(command)+";");
}
return path;
}
/**
* A thread that can answer if it is sleeping in the case of simulated time.
* This class is not useful when simulated time is not being used.
*/
public static class SmartThread extends Thread {
public boolean isSleeping() {
return Time.isThreadWaiting(this);
}
public SmartThread(Runnable r) {
super(r);
}
}
/**
* Creates a thread that calls the given code repeatedly, sleeping for an
* interval of seconds equal to the return value of the previous call.
*
* The given afn may be a callable that returns the number of seconds to
* sleep, or it may be a Callable that returns another Callable that in turn
* returns the number of seconds to sleep. In the latter case isFactory.
*
* @param afn the code to call on each iteration
* @param isDaemon whether the new thread should be a daemon thread
* @param eh code to call when afn throws an exception
* @param priority the new thread's priority
* @param isFactory whether afn returns a callable instead of sleep seconds
* @param startImmediately whether to start the thread before returning
* @param threadName a suffix to be appended to the thread name
* @return the newly created thread
* @see java.lang.Thread
*/
public static SmartThread asyncLoop(final Callable afn,
boolean isDaemon, final Thread.UncaughtExceptionHandler eh,
int priority, final boolean isFactory, boolean startImmediately,
String threadName) {
SmartThread thread = new SmartThread(new Runnable() {
public void run() {
Object s;
try {
Callable fn = isFactory ? (Callable) afn.call() : afn;
while ((s = fn.call()) instanceof Long) {
Time.sleepSecs((Long) s);
}
} catch (Throwable t) {
if (Utils.exceptionCauseIsInstanceOf(
InterruptedException.class, t)) {
LOG.info("Async loop interrupted!");
return;
}
LOG.error("Async loop died!", t);
throw new RuntimeException(t);
}
}
});
if (eh != null) {
thread.setUncaughtExceptionHandler(eh);
} else {
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
LOG.error("Async loop died!", e);
Utils.exitProcess(1, "Async loop died!");
}
});
}
thread.setDaemon(isDaemon);
thread.setPriority(priority);
if (threadName != null && !threadName.isEmpty()) {
thread.setName(thread.getName() +"-"+ threadName);
}
if (startImmediately) {
thread.start();
}
return thread;
}
/**
* Convenience method used when only the function and name suffix are given.
* @param afn the code to call on each iteration
* @param threadName a suffix to be appended to the thread name
* @return the newly created thread
* @see java.lang.Thread
*/
public static SmartThread asyncLoop(final Callable afn, String threadName, final Thread.UncaughtExceptionHandler eh) {
return asyncLoop(afn, false, eh, Thread.NORM_PRIORITY, false, true,
threadName);
}
/**
* Convenience method used when only the function is given.
* @param afn the code to call on each iteration
* @return the newly created thread
*/
public static SmartThread asyncLoop(final Callable afn) {
return asyncLoop(afn, false, null, Thread.NORM_PRIORITY, false, true,
null);
}
public static <T> List<T> interleaveAll(List<List<T>> nodeList) {
if (nodeList != null && nodeList.size() > 0) {
List<T> first = new ArrayList<T>();
List<List<T>> rest = new ArrayList<List<T>>();
for (List<T> node : nodeList) {
if (node != null && node.size() > 0) {
first.add(node.get(0));
rest.add(node.subList(1, node.size()));
}
}
List<T> interleaveRest = interleaveAll(rest);
if (interleaveRest != null) {
first.addAll(interleaveRest);
}
return first;
}
return null;
}
public static long bitXor(Long a, Long b) {
return a ^ b;
}
public static List<String> getRepeat(List<String> list) {
List<String> rtn = new ArrayList<String>();
Set<String> idSet = new HashSet<String>();
for (String id : list) {
if (idSet.contains(id)) {
rtn.add(id);
} else {
idSet.add(id);
}
}
return rtn;
}
/**
* converts a clojure PersistentMap to java HashMap
*/
public static Map<String, Object> convertClojureMapToJavaMap(Map map) {
Map<String, Object> ret = new HashMap<>(map.size());
for (Object obj : map.entrySet()) {
Map.Entry entry = (Map.Entry) obj;
Keyword keyword = (Keyword) entry.getKey();
String key = keyword.getName();
if (key.startsWith(":")) {
key = key.substring(1, key.length());
}
Object value = entry.getValue();
ret.put(key, value);
}
return ret;
}
/**
* Add version information to the given topology
* @param topology the topology being submitted (MIGHT BE MODIFIED)
* @return topology
*/
public static StormTopology addVersions(StormTopology topology) {
String stormVersion = VersionInfo.getVersion();
LOG.warn("STORM-VERSION new {} old {}", stormVersion, topology.get_storm_version());
if (stormVersion != null &&
!"Unknown".equalsIgnoreCase(stormVersion) &&
!topology.is_set_storm_version()) {
topology.set_storm_version(stormVersion);
}
String jdkVersion = System.getProperty("java.version");
if (jdkVersion != null && !topology.is_set_jdk_version()) {
topology.set_jdk_version(jdkVersion);
}
return topology;
}
}