| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kafka.utils |
| |
| import java.io._ |
| import java.nio._ |
| import charset.Charset |
| import java.nio.channels._ |
| import java.util.concurrent.locks.{ReadWriteLock, Lock} |
| import java.lang.management._ |
| import javax.management._ |
| import scala.collection._ |
| import scala.collection.mutable |
| import java.util.Properties |
| import kafka.common.KafkaException |
| import kafka.common.KafkaStorageException |
| |
| |
| /** |
| * General helper functions! |
| * |
| * This is for general helper functions that aren't specific to Kafka logic. Things that should have been included in |
| * the standard library etc. |
| * |
| * If you are making a new helper function and want to add it to this class please ensure the following: |
| * 1. It has documentation |
| * 2. It is the most general possible utility, not just the thing you needed in one particular place |
| * 3. You have tests for it if it is nontrivial in any way |
| */ |
| object Utils extends Logging { |
| |
| /** |
| * Wrap the given function in a java.lang.Runnable |
| * @param fun A function |
| * @return A Runnable that just executes the function |
| */ |
| def runnable(fun: => Unit): Runnable = |
| new Runnable { |
| def run() = fun |
| } |
| |
| /** |
| * Create a daemon thread |
| * @param runnable The runnable to execute in the background |
| * @return The unstarted thread |
| */ |
| def daemonThread(runnable: Runnable): Thread = |
| newThread(runnable, true) |
| |
| /** |
| * Create a daemon thread |
| * @param name The name of the thread |
| * @param runnable The runnable to execute in the background |
| * @return The unstarted thread |
| */ |
| def daemonThread(name: String, runnable: Runnable): Thread = |
| newThread(name, runnable, true) |
| |
| /** |
| * Create a daemon thread |
| * @param name The name of the thread |
| * @param fun The runction to execute in the thread |
| * @return The unstarted thread |
| */ |
| def daemonThread(name: String, fun: () => Unit): Thread = |
| daemonThread(name, runnable(fun)) |
| |
| /** |
| * Create a new thread |
| * @param name The name of the thread |
| * @param runnable The work for the thread to do |
| * @param daemon Should the thread block JVM shutdown? |
| * @return The unstarted thread |
| */ |
| def newThread(name: String, runnable: Runnable, daemon: Boolean): Thread = { |
| val thread = new Thread(runnable, name) |
| thread.setDaemon(daemon) |
| thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { |
| def uncaughtException(t: Thread, e: Throwable) { |
| error("Uncaught exception in thread '" + t.getName + "':", e) |
| } |
| }) |
| thread |
| } |
| |
| /** |
| * Create a new thread |
| * @param runnable The work for the thread to do |
| * @param daemon Should the thread block JVM shutdown? |
| * @return The unstarted thread |
| */ |
| def newThread(runnable: Runnable, daemon: Boolean): Thread = { |
| val thread = new Thread(runnable) |
| thread.setDaemon(daemon) |
| thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { |
| def uncaughtException(t: Thread, e: Throwable) { |
| error("Uncaught exception in thread '" + t.getName + "':", e) |
| } |
| }) |
| thread |
| } |
| |
| /** |
| * Read the given byte buffer into a byte array |
| */ |
| def readBytes(buffer: ByteBuffer): Array[Byte] = readBytes(buffer, 0, buffer.limit) |
| |
| /** |
| * Read a byte array from the given offset and size in the buffer |
| */ |
| def readBytes(buffer: ByteBuffer, offset: Int, size: Int): Array[Byte] = { |
| val dest = new Array[Byte](size) |
| if(buffer.hasArray) { |
| System.arraycopy(buffer.array, buffer.arrayOffset() + offset, dest, 0, size) |
| } else { |
| buffer.mark() |
| buffer.get(dest) |
| buffer.reset() |
| } |
| dest |
| } |
| |
| /** |
| * Read a properties file from the given path |
| * @param filename The path of the file to read |
| */ |
| def loadProps(filename: String): Properties = { |
| val props = new Properties() |
| var propStream: InputStream = null |
| try { |
| propStream = new FileInputStream(filename) |
| props.load(propStream) |
| } finally { |
| if(propStream != null) |
| propStream.close |
| } |
| props |
| } |
| |
| /** |
| * Open a channel for the given file |
| */ |
| def openChannel(file: File, mutable: Boolean): FileChannel = { |
| if(mutable) |
| new RandomAccessFile(file, "rw").getChannel() |
| else |
| new FileInputStream(file).getChannel() |
| } |
| |
| /** |
| * Do the given action and log any exceptions thrown without rethrowing them |
| * @param log The log method to use for logging. E.g. logger.warn |
| * @param action The action to execute |
| */ |
| def swallow(log: (Object, Throwable) => Unit, action: => Unit) { |
| try { |
| action |
| } catch { |
| case e: Throwable => log(e.getMessage(), e) |
| } |
| } |
| |
| /** |
| * Test if two byte buffers are equal. In this case equality means having |
| * the same bytes from the current position to the limit |
| */ |
| def equal(b1: ByteBuffer, b2: ByteBuffer): Boolean = { |
| // two byte buffers are equal if their position is the same, |
| // their remaining bytes are the same, and their contents are the same |
| if(b1.position != b2.position) |
| return false |
| if(b1.remaining != b2.remaining) |
| return false |
| for(i <- 0 until b1.remaining) |
| if(b1.get(i) != b2.get(i)) |
| return false |
| return true |
| } |
| |
| /** |
| * Translate the given buffer into a string |
| * @param buffer The buffer to translate |
| * @param encoding The encoding to use in translating bytes to characters |
| */ |
| def readString(buffer: ByteBuffer, encoding: String = Charset.defaultCharset.toString): String = { |
| val bytes = new Array[Byte](buffer.remaining) |
| buffer.get(bytes) |
| new String(bytes, encoding) |
| } |
| |
| /** |
| * Print an error message and shutdown the JVM |
| * @param message The error message |
| */ |
| def croak(message: String) { |
| System.err.println(message) |
| System.exit(1) |
| } |
| |
| /** |
| * Recursively delete the given file/directory and any subfiles (if any exist) |
| * @param file The root file at which to begin deleting |
| */ |
| def rm(file: String): Unit = rm(new File(file)) |
| |
| /** |
| * Recursively delete the list of files/directories and any subfiles (if any exist) |
| * @param a sequence of files to be deleted |
| */ |
| def rm(files: Seq[String]): Unit = files.map(f => rm(new File(f))) |
| |
| /** |
| * Recursively delete the given file/directory and any subfiles (if any exist) |
| * @param file The root file at which to begin deleting |
| */ |
| def rm(file: File) { |
| if(file == null) { |
| return |
| } else if(file.isDirectory) { |
| val files = file.listFiles() |
| if(files != null) { |
| for(f <- files) |
| rm(f) |
| } |
| file.delete() |
| } else { |
| file.delete() |
| } |
| } |
| |
| /** |
| * Register the given mbean with the platform mbean server, |
| * unregistering any mbean that was there before. Note, |
| * this method will not throw an exception if the registration |
| * fails (since there is nothing you can do and it isn't fatal), |
| * instead it just returns false indicating the registration failed. |
| * @param mbean The object to register as an mbean |
| * @param name The name to register this mbean with |
| * @return true if the registration succeeded |
| */ |
| def registerMBean(mbean: Object, name: String): Boolean = { |
| try { |
| val mbs = ManagementFactory.getPlatformMBeanServer() |
| mbs synchronized { |
| val objName = new ObjectName(name) |
| if(mbs.isRegistered(objName)) |
| mbs.unregisterMBean(objName) |
| mbs.registerMBean(mbean, objName) |
| true |
| } |
| } catch { |
| case e: Exception => { |
| error("Failed to register Mbean " + name, e) |
| false |
| } |
| } |
| } |
| |
| /** |
| * Unregister the mbean with the given name, if there is one registered |
| * @param name The mbean name to unregister |
| */ |
| def unregisterMBean(name: String) { |
| val mbs = ManagementFactory.getPlatformMBeanServer() |
| mbs synchronized { |
| val objName = new ObjectName(name) |
| if(mbs.isRegistered(objName)) |
| mbs.unregisterMBean(objName) |
| } |
| } |
| |
| /** |
| * Read an unsigned integer from the current position in the buffer, |
| * incrementing the position by 4 bytes |
| * @param buffer The buffer to read from |
| * @return The integer read, as a long to avoid signedness |
| */ |
| def readUnsignedInt(buffer: ByteBuffer): Long = |
| buffer.getInt() & 0xffffffffL |
| |
| /** |
| * Read an unsigned integer from the given position without modifying the buffers |
| * position |
| * @param buffer the buffer to read from |
| * @param index the index from which to read the integer |
| * @return The integer read, as a long to avoid signedness |
| */ |
| def readUnsignedInt(buffer: ByteBuffer, index: Int): Long = |
| buffer.getInt(index) & 0xffffffffL |
| |
| /** |
| * Write the given long value as a 4 byte unsigned integer. Overflow is ignored. |
| * @param buffer The buffer to write to |
| * @param value The value to write |
| */ |
| def writetUnsignedInt(buffer: ByteBuffer, value: Long): Unit = |
| buffer.putInt((value & 0xffffffffL).asInstanceOf[Int]) |
| |
| /** |
| * Write the given long value as a 4 byte unsigned integer. Overflow is ignored. |
| * @param buffer The buffer to write to |
| * @param index The position in the buffer at which to begin writing |
| * @param value The value to write |
| */ |
| def writeUnsignedInt(buffer: ByteBuffer, index: Int, value: Long): Unit = |
| buffer.putInt(index, (value & 0xffffffffL).asInstanceOf[Int]) |
| |
| /** |
| * Compute the CRC32 of the byte array |
| * @param bytes The array to compute the checksum for |
| * @return The CRC32 |
| */ |
| def crc32(bytes: Array[Byte]): Long = crc32(bytes, 0, bytes.length) |
| |
| /** |
| * Compute the CRC32 of the segment of the byte array given by the specificed size and offset |
| * @param bytes The bytes to checksum |
| * @param offset the offset at which to begin checksumming |
| * @param size the number of bytes to checksum |
| * @return The CRC32 |
| */ |
| def crc32(bytes: Array[Byte], offset: Int, size: Int): Long = { |
| val crc = new Crc32() |
| crc.update(bytes, offset, size) |
| crc.getValue() |
| } |
| |
| /** |
| * Compute the hash code for the given items |
| */ |
| def hashcode(as: Any*): Int = { |
| if(as == null) |
| return 0 |
| var h = 1 |
| var i = 0 |
| while(i < as.length) { |
| if(as(i) != null) { |
| h = 31 * h + as(i).hashCode |
| i += 1 |
| } |
| } |
| return h |
| } |
| |
| /** |
| * Group the given values by keys extracted with the given function |
| */ |
| def groupby[K,V](vals: Iterable[V], f: V => K): Map[K,List[V]] = { |
| val m = new mutable.HashMap[K, List[V]] |
| for(v <- vals) { |
| val k = f(v) |
| m.get(k) match { |
| case Some(l: List[V]) => m.put(k, v :: l) |
| case None => m.put(k, List(v)) |
| } |
| } |
| m |
| } |
| |
| /** |
| * Read some bytes into the provided buffer, and return the number of bytes read. If the |
| * channel has been closed or we get -1 on the read for any reason, throw an EOFException |
| */ |
| def read(channel: ReadableByteChannel, buffer: ByteBuffer): Int = { |
| channel.read(buffer) match { |
| case -1 => throw new EOFException("Received -1 when reading from channel, socket has likely been closed.") |
| case n: Int => n |
| } |
| } |
| |
| /** |
| * Throw an exception if the given value is null, else return it. You can use this like: |
| * val myValue = Utils.notNull(expressionThatShouldntBeNull) |
| */ |
| def notNull[V](v: V) = { |
| if(v == null) |
| throw new KafkaException("Value cannot be null.") |
| else |
| v |
| } |
| |
| /** |
| * Get the stack trace from an exception as a string |
| */ |
| def stackTrace(e: Throwable): String = { |
| val sw = new StringWriter |
| val pw = new PrintWriter(sw) |
| e.printStackTrace(pw) |
| sw.toString() |
| } |
| |
| /** |
| * This method gets comma separated values which contains key,value pairs and returns a map of |
| * key value pairs. the format of allCSVal is key1:val1, key2:val2 .... |
| */ |
| def parseCsvMap(str: String): Map[String, String] = { |
| val map = new mutable.HashMap[String, String] |
| if("".equals(str)) |
| return map |
| val keyVals = str.split("\\s*,\\s*").map(s => s.split("\\s*:\\s*")) |
| keyVals.map(pair => (pair(0), pair(1))).toMap |
| } |
| |
| /** |
| * Parse a comma separated string into a sequence of strings. |
| * Whitespace surrounding the comma will be removed. |
| */ |
| def parseCsvList(csvList: String): Seq[String] = { |
| if(csvList == null || csvList.isEmpty) |
| Seq.empty[String] |
| else { |
| csvList.split("\\s*,\\s*").filter(v => !v.equals("")) |
| } |
| } |
| |
| /** |
| * Create an instance of the class with the given class name |
| */ |
| def createObject[T<:AnyRef](className: String, args: AnyRef*): T = { |
| val klass = Class.forName(className).asInstanceOf[Class[T]] |
| val constructor = klass.getConstructor(args.map(_.getClass): _*) |
| constructor.newInstance(args: _*).asInstanceOf[T] |
| } |
| |
| /** |
| * Is the given string null or empty ("")? |
| */ |
| def nullOrEmpty(s: String): Boolean = s == null || s.equals("") |
| |
| /** |
| * Create a circular (looping) iterator over a collection. |
| * @param coll An iterable over the underlying collection. |
| * @return A circular iterator over the collection. |
| */ |
| def circularIterator[T](coll: Iterable[T]) = { |
| val stream: Stream[T] = |
| for (forever <- Stream.continually(1); t <- coll) yield t |
| stream.iterator |
| } |
| |
| /** |
| * Attempt to read a file as a string |
| */ |
| def readFileAsString(path: String, charset: Charset = Charset.defaultCharset()): String = { |
| val stream = new FileInputStream(new File(path)) |
| try { |
| val fc = stream.getChannel() |
| val bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size()) |
| charset.decode(bb).toString() |
| } |
| finally { |
| stream.close() |
| } |
| } |
| |
| /** |
| * Get the absolute value of the given number. If the number is Int.MinValue return 0. |
| * This is different from java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!). |
| */ |
| def abs(n: Int) = if(n == Integer.MIN_VALUE) 0 else math.abs(n) |
| |
| /** |
| * Replace the given string suffix with the new suffix. If the string doesn't end with the given suffix throw an exception. |
| */ |
| def replaceSuffix(s: String, oldSuffix: String, newSuffix: String): String = { |
| if(!s.endsWith(oldSuffix)) |
| throw new IllegalArgumentException("Expected string to end with '%s' but string is '%s'".format(oldSuffix, s)) |
| s.substring(0, s.length - oldSuffix.length) + newSuffix |
| } |
| |
| /** |
| * Create a file with the given path |
| * @param path The path to create |
| * @throws KafkaStorageException If the file create fails |
| * @return The created file |
| */ |
| def createFile(path: String): File = { |
| val f = new File(path) |
| val created = f.createNewFile() |
| if(!created) |
| throw new KafkaStorageException("Failed to create file %s.".format(path)) |
| f |
| } |
| |
| /** |
| * Turn a properties map into a string |
| */ |
| def asString(props: Properties): String = { |
| val writer = new StringWriter() |
| props.store(writer, "") |
| writer.toString |
| } |
| |
| /** |
| * Read some properties with the given default values |
| */ |
| def readProps(s: String, defaults: Properties): Properties = { |
| val reader = new StringReader(s) |
| val props = new Properties(defaults) |
| props.load(reader) |
| props |
| } |
| |
| /** |
| * Read a big-endian integer from a byte array |
| */ |
| def readInt(bytes: Array[Byte], offset: Int): Int = { |
| ((bytes(offset) & 0xFF) << 24) | |
| ((bytes(offset + 1) & 0xFF) << 16) | |
| ((bytes(offset + 2) & 0xFF) << 8) | |
| (bytes(offset + 3) & 0xFF) |
| } |
| |
| /** |
| * Execute the given function inside the lock |
| */ |
| def inLock[T](lock: Lock)(fun: => T): T = { |
| lock.lock() |
| try { |
| fun |
| } finally { |
| lock.unlock() |
| } |
| } |
| |
| def inReadLock[T](lock: ReadWriteLock)(fun: => T): T = inLock[T](lock.readLock)(fun) |
| |
| def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T = inLock[T](lock.writeLock)(fun) |
| |
| |
| //JSON strings need to be escaped based on ECMA-404 standard http://json.org |
| def JSONEscapeString (s : String) : String = { |
| s.map { |
| case '"' => "\\\"" |
| case '\\' => "\\\\" |
| case '/' => "\\/" |
| case '\b' => "\\b" |
| case '\f' => "\\f" |
| case '\n' => "\\n" |
| case '\r' => "\\r" |
| case '\t' => "\\t" |
| /* We'll unicode escape any control characters. These include: |
| * 0x0 -> 0x1f : ASCII Control (C0 Control Codes) |
| * 0x7f : ASCII DELETE |
| * 0x80 -> 0x9f : C1 Control Codes |
| * |
| * Per RFC4627, section 2.5, we're not technically required to |
| * encode the C1 codes, but we do to be safe. |
| */ |
| case c if ((c >= '\u0000' && c <= '\u001f') || (c >= '\u007f' && c <= '\u009f')) => "\\u%04x".format(c: Int) |
| case c => c |
| }.mkString |
| } |
| |
| /** |
| * Returns a list of duplicated items |
| */ |
| def duplicates[T](s: Traversable[T]): Iterable[T] = { |
| s.groupBy(identity) |
| .map{ case (k,l) => (k,l.size)} |
| .filter{ case (k,l) => (l > 1) } |
| .keys |
| } |
| } |