blob: 9194d07c67a41f1837c7ea4b04a7c230cd99810f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package backtype.storm.utils;
import backtype.storm.Config;
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.ComponentObject;
import backtype.storm.generated.StormTopology;
import backtype.storm.serialization.DefaultSerializationDelegate;
import backtype.storm.serialization.SerializationDelegate;
import clojure.lang.IFn;
import clojure.lang.RT;
import com.alibaba.jstorm.utils.LoadConf;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.commons.io.input.ClassLoaderObjectInputStream;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.thrift.TException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import java.io.*;
import java.lang.reflect.Constructor;
import java.net.URL;
import java.net.URLClassLoader;
import java.net.URLDecoder;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.*;
import java.util.Map.Entry;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public class Utils {
private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
public static final String DEFAULT_STREAM_ID = "default";
private static SerializationDelegate serializationDelegate;
static {
Map conf = readStormConfig();
serializationDelegate = getSerializationDelegate(conf);
}
public static Object newInstance(String klass) {
try {
Class c = Class.forName(klass);
return c.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static Object newInstance(String klass, Object... params) {
try {
Class c = Class.forName(klass);
Constructor[] constructors = c.getConstructors();
boolean found = false;
Constructor con = null;
for (Constructor cons : constructors) {
if (cons.getParameterTypes().length == params.length) {
con = cons;
break;
}
}
if (con == null) {
throw new RuntimeException("Cound not found the corresponding constructor, params=" + params.toString());
} else {
if (con.getParameterTypes().length == 0) {
return c.newInstance();
} else {
return con.newInstance(params);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Go thrift gzip serializer
*
* @param obj
* @return
*/
public static byte[] serialize(Object obj) {
/**
* @@@ JStorm disable the thrift.gz.serializer
*/
// return serializationDelegate.serialize(obj);
return javaSerialize(obj);
}
/**
* Go thrift gzip serializer
*
* @return
*/
public static <T> T deserialize(byte[] serialized, Class<T> clazz) {
/**
* @@@ JStorm disable the thrift.gz.serializer
*/
// return serializationDelegate.deserialize(serialized, clazz);
return (T) javaDeserialize(serialized);
}
public static byte[] javaSerialize(Object obj) {
if (obj instanceof byte[]) {
return (byte[]) obj;
}
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
oos.close();
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static Object maybe_deserialize(byte[] data) {
if (data == null || data.length == 0) {
return null;
}
try {
return javaDeserializeWithCL(data, null);
} catch (Exception e) {
return null;
}
}
/**
* Deserialized with ClassLoader
*
* @param serialized
* @param loader
* @return
*/
public static Object javaDeserializeWithCL(byte[] serialized, URLClassLoader loader) {
try {
ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
Object ret = null;
if (loader != null) {
ClassLoaderObjectInputStream cis = new ClassLoaderObjectInputStream(loader, bis);
ret = cis.readObject();
cis.close();
} else {
ObjectInputStream ois = new ObjectInputStream(bis);
ret = ois.readObject();
ois.close();
}
return ret;
} catch (IOException ioe) {
throw new RuntimeException(ioe);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
public static Object javaDeserialize(byte[] serialized) {
return javaDeserializeWithCL(serialized, WorkerClassLoader.getInstance());
}
public static <T> T javaDeserialize(byte[] serialized, Class<T> clazz) {
return (T) javaDeserializeWithCL(serialized, WorkerClassLoader.getInstance());
}
public static String to_json(Object m) {
// return JSON.toJSONString(m);
return JSONValue.toJSONString(m);
}
public static Object from_json(String json) {
if (json == null) {
return null;
} else {
// return JSON.parse(json);
return JSONValue.parse(json);
}
}
public static String toPrettyJsonString(Object obj) {
Gson gson2 = new GsonBuilder().setPrettyPrinting().create();
String ret = gson2.toJson(obj);
return ret;
}
public static byte[] gzip(byte[] data) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream out = new GZIPOutputStream(bos);
out.write(data);
out.close();
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static byte[] gunzip(byte[] data) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ByteArrayInputStream bis = new ByteArrayInputStream(data);
GZIPInputStream in = new GZIPInputStream(bis);
byte[] buffer = new byte[1024];
int len = 0;
while ((len = in.read(buffer)) >= 0) {
bos.write(buffer, 0, len);
}
in.close();
bos.close();
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static <T> String join(Iterable<T> coll, String sep) {
Iterator<T> it = coll.iterator();
String ret = "";
while (it.hasNext()) {
ret = ret + it.next();
if (it.hasNext()) {
ret = ret + sep;
}
}
return ret;
}
public static void sleep(long millis) {
try {
Time.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* Please directly use LoadConf.findResources(name);
*
* @param name
* @return
*/
@Deprecated
public static List<URL> findResources(String name) {
return LoadConf.findResources(name);
}
/**
* Please directly use LoadConf.findAndReadYaml(name);
*
* @param name
* @return
*/
@Deprecated
public static Map findAndReadConfigFile(String name, boolean mustExist) {
return LoadConf.findAndReadYaml(name, mustExist, false);
}
public static Map findAndReadConfigFile(String name) {
return LoadConf.findAndReadYaml(name, true, false);
}
public static Map readDefaultConfig() {
return LoadConf.findAndReadYaml("defaults.yaml", true, false);
}
public static Map readCommandLineOpts() {
Map ret = new HashMap();
String commandOptions = System.getProperty("storm.options");
if (commandOptions != null) {
String[] configs = commandOptions.split(",");
for (String config : configs) {
config = URLDecoder.decode(config);
String[] options = config.split("=", 2);
if (options.length == 2) {
Object val = JSONValue.parse(options[1]);
if (val == null) {
val = options[1];
}
ret.put(options[0], val);
}
}
}
return ret;
}
public static void replaceLocalDir(Map<Object, Object> conf) {
String stormHome = System.getProperty("jstorm.home");
boolean isEmpty = StringUtils.isBlank(stormHome);
Map<Object, Object> replaceMap = new HashMap<Object, Object>();
for (Entry entry : conf.entrySet()) {
Object key = entry.getKey();
Object value = entry.getValue();
if (value instanceof String) {
if (StringUtils.isBlank((String) value) == true) {
continue;
}
String str = (String) value;
if (isEmpty == true) {
// replace %JSTORM_HOME% as current directory
str = str.replace("%JSTORM_HOME%", ".");
} else {
str = str.replace("%JSTORM_HOME%", stormHome);
}
replaceMap.put(key, str);
}
}
conf.putAll(replaceMap);
}
public static Map loadDefinedConf(String confFile) {
File file = new File(confFile);
if (file.exists() == false) {
return findAndReadConfigFile(confFile, true);
}
Yaml yaml = new Yaml();
Map ret;
try {
ret = (Map) yaml.load(new FileReader(file));
} catch (FileNotFoundException e) {
ret = null;
}
if (ret == null)
ret = new HashMap();
return new HashMap(ret);
}
public static Map readStormConfig() {
Map ret = readDefaultConfig();
String confFile = System.getProperty("storm.conf.file");
Map storm;
if (StringUtils.isBlank(confFile) == true) {
storm = findAndReadConfigFile("storm.yaml", false);
} else {
storm = loadDefinedConf(confFile);
}
ret.putAll(storm);
ret.putAll(readCommandLineOpts());
replaceLocalDir(ret);
return ret;
}
private static Object normalizeConf(Object conf) {
if (conf == null)
return new HashMap();
if (conf instanceof Map) {
Map confMap = new HashMap((Map) conf);
for (Object key : confMap.keySet()) {
Object val = confMap.get(key);
confMap.put(key, normalizeConf(val));
}
return confMap;
} else if (conf instanceof List) {
List confList = new ArrayList((List) conf);
for (int i = 0; i < confList.size(); i++) {
Object val = confList.get(i);
confList.set(i, normalizeConf(val));
}
return confList;
} else if (conf instanceof Integer) {
return ((Integer) conf).longValue();
} else if (conf instanceof Float) {
return ((Float) conf).doubleValue();
} else {
return conf;
}
}
public static boolean isValidConf(Map<String, Object> stormConf) {
return normalizeConf(stormConf).equals(normalizeConf(Utils.from_json(Utils.to_json(stormConf))));
}
public static Object getSetComponentObject(ComponentObject obj, URLClassLoader loader) {
if (obj.getSetField() == ComponentObject._Fields.SERIALIZED_JAVA) {
return javaDeserializeWithCL(obj.get_serialized_java(), loader);
} else if (obj.getSetField() == ComponentObject._Fields.JAVA_OBJECT) {
return obj.get_java_object();
} else {
return obj.get_shell();
}
}
public static <S, T> T get(Map<S, T> m, S key, T def) {
T ret = m.get(key);
if (ret == null) {
ret = def;
}
return ret;
}
public static List<Object> tuple(Object... values) {
List<Object> ret = new ArrayList<Object>();
for (Object v : values) {
ret.add(v);
}
return ret;
}
public static void downloadFromMaster(Map conf, String file, String localFile) throws IOException, TException {
WritableByteChannel out = null;
NimbusClient client = null;
try {
client = NimbusClient.getConfiguredClient(conf, 10 * 1000);
String id = client.getClient().beginFileDownload(file);
out = Channels.newChannel(new FileOutputStream(localFile));
while (true) {
ByteBuffer chunk = client.getClient().downloadChunk(id);
int written = out.write(chunk);
if (written == 0) {
client.getClient().finishFileDownload(id);
break;
}
}
} finally {
if (out != null)
out.close();
if (client != null)
client.close();
}
}
public static IFn loadClojureFn(String namespace, String name) {
try {
clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")"));
} catch (Exception e) {
// if playing from the repl and defining functions, file won't exist
}
return (IFn) RT.var(namespace, name).deref();
}
public static boolean isSystemId(String id) {
return id.startsWith("__");
}
public static <K, V> Map<V, K> reverseMap(Map<K, V> map) {
Map<V, K> ret = new HashMap<V, K>();
for (K key : map.keySet()) {
ret.put(map.get(key), key);
}
return ret;
}
public static ComponentCommon getComponentCommon(StormTopology topology, String id) {
if (topology.get_spouts().containsKey(id)) {
return topology.get_spouts().get(id).get_common();
}
if (topology.get_bolts().containsKey(id)) {
return topology.get_bolts().get(id).get_common();
}
if (topology.get_state_spouts().containsKey(id)) {
return topology.get_state_spouts().get(id).get_common();
}
throw new IllegalArgumentException("Could not find component with id " + id);
}
public static Integer getInt(Object o) {
Integer result = getInt(o, null);
if (null == result) {
throw new IllegalArgumentException("Don't know how to convert null to int");
}
return result;
}
public static Integer getInt(Object o, Integer defaultValue) {
if (null == o) {
return defaultValue;
}
if (o instanceof Number) {
return ((Number) o).intValue();
} else if (o instanceof String) {
return Integer.parseInt(((String) o));
} else {
throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
}
}
public static long secureRandomLong() {
return UUID.randomUUID().getLeastSignificantBits();
}
public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root) {
return newCurator(conf, servers, port, root, null);
}
public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
List<String> serverPorts = new ArrayList<String>();
for (String zkServer : (List<String>) servers) {
serverPorts.add(zkServer + ":" + Utils.getInt(port));
}
String zkStr = StringUtils.join(serverPorts, ",") + root;
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
setupBuilder(builder, zkStr, conf, auth);
return builder.build();
}
protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, String zkStr, Map conf, ZookeeperAuthInfo auth) {
builder.connectString(zkStr)
.connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
.sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
.retryPolicy(
new StormBoundedExponentialBackoffRetry(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)), Utils.getInt(conf
.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES))));
if (auth != null && auth.scheme != null && auth.payload != null) {
builder = builder.authorization(auth.scheme, auth.payload);
}
}
public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) {
return newCurator(conf, servers, port, "", auth);
}
public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
CuratorFramework ret = newCurator(conf, servers, port, root, auth);
ret.start();
return ret;
}
public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) {
CuratorFramework ret = newCurator(conf, servers, port, auth);
ret.start();
return ret;
}
/**
*
(defn integer-divided [sum num-pieces] (let [base (int (/ sum num-pieces)) num-inc (mod sum num-pieces) num-bases (- num-pieces num-inc)] (if (= num-inc
* 0) {base num-bases} {base num-bases (inc base) num-inc} )))
*
* @param sum
* @param numPieces
* @return
*/
public static TreeMap<Integer, Integer> integerDivided(int sum, int numPieces) {
int base = sum / numPieces;
int numInc = sum % numPieces;
int numBases = numPieces - numInc;
TreeMap<Integer, Integer> ret = new TreeMap<Integer, Integer>();
ret.put(base, numBases);
if (numInc != 0) {
ret.put(base + 1, numInc);
}
return ret;
}
public static byte[] toByteArray(ByteBuffer buffer) {
byte[] ret = new byte[buffer.remaining()];
buffer.get(ret, 0, ret.length);
return ret;
}
public static void readAndLogStream(String prefix, InputStream in) {
try {
BufferedReader r = new BufferedReader(new InputStreamReader(in));
String line = null;
while ((line = r.readLine()) != null) {
LOG.info("{}:{}", prefix, line);
}
} catch (IOException e) {
LOG.warn("Error whiel trying to log stream", e);
}
}
public static boolean exceptionCauseIsInstanceOf(Class klass, Throwable throwable) {
Throwable t = throwable;
while (t != null) {
if (klass.isInstance(t)) {
return true;
}
t = t.getCause();
}
return false;
}
/**
* Is the cluster configured to interact with ZooKeeper in a secure way? This only works when called from within Nimbus or a Supervisor process.
*
* @param conf the storm configuration, not the topology configuration
* @return true if it is configured else false.
*/
public static boolean isZkAuthenticationConfiguredStormServer(Map conf) {
return null != System.getProperty("java.security.auth.login.config")
|| (conf != null && conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME) != null && !((String) conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME)).isEmpty());
}
/**
* Is the topology configured to have ZooKeeper authentication.
*
* @param conf the topology configuration
* @return true if ZK is configured else false
*/
public static boolean isZkAuthenticationConfiguredTopology(Map conf) {
return (conf != null && conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME) != null && !((String) conf
.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
}
public static List<ACL> getWorkerACL(Map conf) {
// This is a work around to an issue with ZK where a sasl super user is not super unless there is an open SASL ACL so we are trying to give the correct
// perms
if (!isZkAuthenticationConfiguredTopology(conf)) {
return null;
}
String stormZKUser = (String) conf.get(Config.STORM_ZOOKEEPER_SUPERACL);
if (stormZKUser == null) {
throw new IllegalArgumentException("Authentication is enabled but " + Config.STORM_ZOOKEEPER_SUPERACL + " is not set");
}
String[] split = stormZKUser.split(":", 2);
if (split.length != 2) {
throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL + " does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
}
ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL);
ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1])));
return ret;
}
public static String threadDump() {
final StringBuilder dump = new StringBuilder();
final java.lang.management.ThreadMXBean threadMXBean = java.lang.management.ManagementFactory.getThreadMXBean();
final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
for (java.lang.management.ThreadInfo threadInfo : threadInfos) {
dump.append('"');
dump.append(threadInfo.getThreadName());
dump.append("\" ");
final Thread.State state = threadInfo.getThreadState();
dump.append("\n java.lang.Thread.State: ");
dump.append(state);
final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
for (final StackTraceElement stackTraceElement : stackTraceElements) {
dump.append("\n at ");
dump.append(stackTraceElement);
}
dump.append("\n\n");
}
return dump.toString();
}
// Assumes caller is synchronizing
private static SerializationDelegate getSerializationDelegate(Map stormConf) {
String delegateClassName = (String) stormConf.get(Config.STORM_META_SERIALIZATION_DELEGATE);
SerializationDelegate delegate;
try {
Class delegateClass = Class.forName(delegateClassName);
delegate = (SerializationDelegate) delegateClass.newInstance();
} catch (ClassNotFoundException e) {
LOG.error("Failed to construct serialization delegate, falling back to default", e);
delegate = new DefaultSerializationDelegate();
} catch (InstantiationException e) {
LOG.error("Failed to construct serialization delegate, falling back to default", e);
delegate = new DefaultSerializationDelegate();
} catch (IllegalAccessException e) {
LOG.error("Failed to construct serialization delegate, falling back to default", e);
delegate = new DefaultSerializationDelegate();
}
delegate.prepare(stormConf);
return delegate;
}
public static void handleUncaughtException(Throwable t) {
if (t != null && t instanceof Error) {
if (t instanceof OutOfMemoryError) {
try {
System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName());
} catch (Throwable err) {
// Again we don't want to exit because of logging issues.
}
Runtime.getRuntime().halt(-1);
} else {
// Running in daemon mode, we would pass Error to calling thread.
throw (Error) t;
}
}
}
public static List<String> tokenize_path(String path) {
String[] toks = path.split("/");
ArrayList<String> rtn = new ArrayList<String>();
for (String str : toks) {
if (!str.isEmpty()) {
rtn.add(str);
}
}
return rtn;
}
public static String toks_to_path(List<String> toks) {
StringBuffer buff = new StringBuffer();
buff.append("/");
int size = toks.size();
for (int i = 0; i < size; i++) {
buff.append(toks.get(i));
if (i < (size - 1)) {
buff.append("/");
}
}
return buff.toString();
}
public static String normalize_path(String path) {
String rtn = toks_to_path(tokenize_path(path));
return rtn;
}
public static String printStack() {
StringBuilder sb = new StringBuilder();
sb.append("\nCurrent call stack:\n");
StackTraceElement[] stackElements = Thread.currentThread().getStackTrace();
for (int i = 2; i < stackElements.length; i++) {
sb.append("\t").append(stackElements[i]).append("\n");
}
return sb.toString();
}
private static Map loadProperty(String prop) {
Map ret = new HashMap<Object, Object>();
Properties properties = new Properties();
try {
InputStream stream = new FileInputStream(prop);
properties.load(stream);
if (properties.size() == 0) {
System.out.println("WARN: Config file is empty");
return null;
} else {
ret.putAll(properties);
}
} catch (FileNotFoundException e) {
System.out.println("No such file " + prop);
throw new RuntimeException(e.getMessage());
} catch (Exception e1) {
e1.printStackTrace();
throw new RuntimeException(e1.getMessage());
}
return ret;
}
private static Map loadYaml(String confPath) {
Map ret = new HashMap<Object, Object>();
Yaml yaml = new Yaml();
try {
InputStream stream = new FileInputStream(confPath);
ret = (Map) yaml.load(stream);
if (ret == null || ret.isEmpty() == true) {
System.out.println("WARN: Config file is empty");
return null;
}
} catch (FileNotFoundException e) {
System.out.println("No such file " + confPath);
throw new RuntimeException("No config file");
} catch (Exception e1) {
e1.printStackTrace();
throw new RuntimeException("Failed to read config file");
}
return ret;
}
public static Map loadConf(String arg) {
Map ret = null;
if (arg.endsWith("yaml")) {
ret = loadYaml(arg);
} else {
ret = loadProperty(arg);
}
return ret;
}
public static String getVersion() {
String ret = "";
InputStream input = null;
try {
input = Utils.class.getClassLoader().getResourceAsStream("version");
BufferedReader in = new BufferedReader(new InputStreamReader(input));
String s = in.readLine();
ret = s.trim();
} catch (Exception e) {
LOG.warn("Failed to get version", e);
} finally {
if (input != null) {
try {
input.close();
} catch (Exception e) {
LOG.error("Failed to close the reader of RELEASE", e);
}
}
}
return ret;
}
public static void writeIntToByteArray(byte[] bytes, int offset, int value) {
bytes[offset++] = (byte) (value & 0x000000FF);
bytes[offset++] = (byte) ((value & 0x0000FF00) >> 8);
bytes[offset++] = (byte) ((value & 0x00FF0000) >> 16);
bytes[offset] = (byte) ((value & 0xFF000000) >> 24);
}
public static int readIntFromByteArray(byte[] bytes, int offset) {
int ret = 0;
ret = ret | (bytes[offset++] & 0x000000FF);
ret = ret | ((bytes[offset++] << 8) & 0x0000FF00);
ret = ret | ((bytes[offset++] << 16) & 0x00FF0000);
ret = ret | ((bytes[offset] << 24) & 0xFF000000);
return ret;
}
}