HAMA-955: Support UnsafeByteArrayInputStream and UnSafeByteArrayOutputStream

git-svn-id: https://svn.apache.org/repos/asf/hama/trunk@1682806 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 8759b6d..e914bb1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,7 +25,8 @@
    HAMA-885: Semi-Clustering is not producing expected output (Renil J via edwardyoon)
 
   IMPROVEMENTS
-
+  
+   HAMA-955: Support UnsafeByteArrayInputStream and UnSafeByteArrayOutputStream (Minho Kim via edwardyoon)
    HAMA-944: Add JSON format option to fastgen command (Minho Kim via edwardyoon)
    HAMA-919: Manage messages per Vertex (edwardyoon)
    HAMA-923: add a toString() method for FloatArrayWritable and TextArrayWritable classes (edwardyoon)
diff --git a/core/src/main/java/org/apache/hama/util/ByteUtils.java b/core/src/main/java/org/apache/hama/util/ByteUtils.java
new file mode 100644
index 0000000..7b69854
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/util/ByteUtils.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+/**
+ * Utilities class for byte operations and constants
+ */
+public class ByteUtils {
+  /** Bytes used in a boolean */
+  public static final int SIZE_OF_BOOLEAN = 1;
+  /** Bytes used in a byte */
+  public static final int SIZE_OF_BYTE = 1;
+  /** Bytes used in a char */
+  public static final int SIZE_OF_CHAR = Character.SIZE / Byte.SIZE;
+  /** Bytes used in a short */
+  public static final int SIZE_OF_SHORT = Short.SIZE / Byte.SIZE;
+  /** Bytes used in an int */
+  public static final int SIZE_OF_INT = Integer.SIZE / Byte.SIZE;
+  /** Bytes used in a long */
+  public static final int SIZE_OF_LONG = Long.SIZE / Byte.SIZE;
+  /** Bytes used in a float */
+  public static final int SIZE_OF_FLOAT = Float.SIZE / Byte.SIZE;
+  /** Bytes used in a double */
+  public static final int SIZE_OF_DOUBLE = Double.SIZE / Byte.SIZE;
+}
diff --git a/core/src/main/java/org/apache/hama/util/ExtendedDataOutput.java b/core/src/main/java/org/apache/hama/util/ExtendedDataOutput.java
new file mode 100644
index 0000000..44bee8a
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/util/ExtendedDataOutput.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.DataOutput;
+
+/**
+ * Add a few features to data output
+ */
+public interface ExtendedDataOutput extends DataOutput {
+  /**
+   * Ensure that backing byte structure has at least minSize
+   * additional bytes
+   *
+   * @param minSize additional size required
+   */
+  void ensureWritable(int minSize);
+
+  /**
+   * Skip some number of bytes.
+   *
+   * @param  bytesToSkip Number of bytes to skip
+   */
+  void skipBytes(int bytesToSkip);
+
+  /**
+   * In order to write a size as a first part of an data output, it is
+   * useful to be able to write an int at an arbitrary location in the stream
+   *
+   * @param pos Byte position in the output stream
+   * @param value Value to write
+   */
+  void writeInt(int pos, int value);
+
+  /**
+   * Get the position in the output stream
+   *
+   * @return Position in the output stream
+   */
+  int getPos();
+
+  /**
+   * Get the internal byte array (if possible), read-only
+   *
+   * @return Internal byte array (do not modify)
+   */
+  byte[] getByteArray();
+
+  /**
+   * Copies the internal byte array
+   *
+   * @return Copied byte array
+   */
+  byte[] toByteArray();
+
+  /**
+   * Return a copy of slice of byte array
+   *
+   * @param offset offset of array
+   * @param length length of slice
+   * @return byte array
+   */
+  byte[] toByteArray(int offset, int length);
+
+  /**
+   * Clears the buffer
+   */
+  void reset();
+}
diff --git a/core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java b/core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java
new file mode 100644
index 0000000..da05230
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+
+/**
+ * This class, much like {@link ByteArrayInputStream} uses a given buffer as a
+ * source of an InputStream. Unlike ByteArrayInputStream, this class does not
+ * "waste" memory by creating a local copy of the given buffer, but rather uses
+ * the given buffer as is. Hence the name Unsafe. While using this class one
+ * should remember that the byte[] buffer memory is shared and might be changed
+ * from outside.
+ *
+ * For reuse-ability, a call for {@link #reInit(byte[])} can be called, and
+ * initialize the stream with a new buffer.
+ *
+ */
+public class UnsafeByteArrayInputStream extends InputStream {
+  private byte[] buffer;
+  private int markIndex;
+  private int upperLimit;
+  private int index;
+
+  /**
+   * Creates a new instance by not using any byte[] up front. If you use this
+   * constructor, you MUST call either of the {@link #reInit(byte[]) reInit}
+   * methods before you consume any byte from this instance.<br>
+   * This constructor is for convenience purposes only, so that if one does not
+   * have the byte[] at the moment of creation, one is not forced to pass a
+   * <code>new byte[0]</code> or something. Obviously in that case, one will
+   * call either {@link #reInit(byte[]) reInit} methods before using the class.
+   */
+  public UnsafeByteArrayInputStream() {
+    markIndex = upperLimit = index = 0;
+  }
+
+  /**
+   * Creates an UnsafeByteArrayInputStream which uses a given byte array as the
+   * source of the stream. Default range is [0 , buffer.length)
+   *
+   * @param buffer byte array used as the source of this stream
+   */
+  public UnsafeByteArrayInputStream(byte[] buffer) {
+    reInit(buffer, 0, buffer.length);
+  }
+
+  /**
+   * Creates an UnsafeByteArrayInputStream which uses a given byte array as the
+   * source of the stream, at the specific range: [startPos, endPos)
+   *
+   * @param buffer byte array used as the source of this stream
+   * @param startPos first index (inclusive) to the data lying in the given
+   *          buffer
+   * @param endPos an index (exclusive) where the data ends. data @
+   *          buffer[endPos] will never be read
+   */
+  public UnsafeByteArrayInputStream(byte[] buffer, int startPos, int endPos) {
+    reInit(buffer, startPos, endPos);
+  }
+
+  @Override
+  public void mark(int readlimit) {
+    markIndex = index;
+  }
+
+  @Override
+  public boolean markSupported() {
+    return true;
+  }
+
+  /**
+   * Initialize the stream with a given buffer, using the default limits of [0,
+   * buffer.length)
+   *
+   * @param buffer byte array used as the source of this stream
+   */
+  public void reInit(byte[] buffer) {
+    reInit(buffer, 0, buffer.length);
+  }
+
+  /**
+   * Initialize the stream with a given byte array as the source of the stream,
+   * at the specific range: [startPos, endPos)
+   *
+   * @param buffer byte array used as the source of this stream
+   * @param startPos first index (inclusive) to the data lying in the given
+   *          buffer
+   * @param endPos an index (exclusive) where the data ends. data @
+   *          buffer[endPos] will never be read
+   */
+  public void reInit(byte[] buffer, int startPos, int endPos) {
+    this.buffer = buffer;
+    markIndex = startPos;
+    upperLimit = endPos;
+    index = markIndex;
+  }
+
+  @Override
+  public int available() throws IOException {
+    return upperLimit - index;
+  }
+
+  /**
+   * Read a byte. Data returned as an integer [0,255] If end of stream reached,
+   * returns -1
+   */
+  @Override
+  public int read() throws IOException {
+    return index < upperLimit ? buffer[index++] & 0xff : -1;
+  }
+
+  /**
+   * Resets the stream back to its original state. Basically - moving the index
+   * back to start position.
+   */
+  @Override
+  public void reset() throws IOException {
+    index = markIndex;
+  }
+}
diff --git a/core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java b/core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java
new file mode 100644
index 0000000..c590a50
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+/**
+ * Byte array output stream that uses Unsafe methods to serialize/deserialize
+ * much faster
+ */
+@SuppressWarnings("restriction")
+public class UnsafeByteArrayOutputStream extends OutputStream implements
+    ExtendedDataOutput {
+  static {
+    try {
+      Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+      field.setAccessible(true);
+      UNSAFE = (sun.misc.Unsafe) field.get(null);
+      // Checkstyle exception due to needing to check if unsafe is allowed
+      // CHECKSTYLE: stop IllegalCatch
+    } catch (Exception e) {
+      // CHECKSTYLE: resume IllegalCatch
+      throw new RuntimeException("UnsafeByteArrayOutputStream: Failed to "
+          + "get unsafe", e);
+    }
+  }
+
+  /** Default number of bytes */
+  private static final int DEFAULT_BYTES = 32;
+  /** Access to the unsafe class */
+  private static final sun.misc.Unsafe UNSAFE;
+
+  /** Offset of a byte array */
+  private static final long BYTE_ARRAY_OFFSET = UNSAFE
+      .arrayBaseOffset(byte[].class);
+
+  /** Byte buffer */
+  private byte[] buf;
+  /** Position in the buffer */
+  private int pos = 0;
+
+  /**
+   * Constructor
+   */
+  public UnsafeByteArrayOutputStream() {
+    this(DEFAULT_BYTES);
+  }
+
+  /**
+   * Constructor
+   *
+   * @param size Initial size of the underlying byte array
+   */
+  public UnsafeByteArrayOutputStream(int size) {
+    buf = new byte[size];
+  }
+
+  /**
+   * Constructor to take in a buffer
+   *
+   * @param buf Buffer to start with, or if null, create own buffer
+   */
+  public UnsafeByteArrayOutputStream(byte[] buf) {
+    if (buf == null) {
+      this.buf = new byte[DEFAULT_BYTES];
+    } else {
+      this.buf = buf;
+    }
+  }
+
+  /**
+   * Constructor to take in a buffer with a given position into that buffer
+   *
+   * @param buf Buffer to start with
+   * @param pos Position to write at the buffer
+   */
+  public UnsafeByteArrayOutputStream(byte[] buf, int pos) {
+    this(buf);
+    this.pos = pos;
+  }
+
+  /**
+   * Ensure that this buffer has enough remaining space to add the size. Creates
+   * and copies to a new buffer if necessary
+   *
+   * @param size Size to add
+   */
+  private void ensureSize(int size) {
+    if (pos + size > buf.length) {
+      byte[] newBuf = new byte[(buf.length + size) << 1];
+      System.arraycopy(buf, 0, newBuf, 0, pos);
+      buf = newBuf;
+    }
+  }
+
+  @Override
+  public byte[] getByteArray() {
+    return buf;
+  }
+
+  @Override
+  public byte[] toByteArray() {
+    return Arrays.copyOf(buf, pos);
+  }
+
+  @Override
+  public byte[] toByteArray(int offset, int length) {
+    if (offset + length > pos) {
+      throw new IndexOutOfBoundsException(String.format("Offset: %d + "
+          + "Length: %d exceeds the size of buf : %d", offset, length, pos));
+    }
+    return Arrays.copyOfRange(buf, offset, length);
+  }
+
+  @Override
+  public void reset() {
+    pos = 0;
+  }
+
+  @Override
+  public int getPos() {
+    return pos;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    ensureSize(ByteUtils.SIZE_OF_BYTE);
+    buf[pos] = (byte) b;
+    pos += ByteUtils.SIZE_OF_BYTE;
+  }
+
+  @Override
+  public void write(byte[] b) throws IOException {
+    ensureSize(b.length);
+    System.arraycopy(b, 0, buf, pos, b.length);
+    pos += b.length;
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    ensureSize(len);
+    System.arraycopy(b, off, buf, pos, len);
+    pos += len;
+  }
+
+  @Override
+  public void writeBoolean(boolean v) throws IOException {
+    ensureSize(ByteUtils.SIZE_OF_BOOLEAN);
+    UNSAFE.putBoolean(buf, BYTE_ARRAY_OFFSET + pos, v);
+    pos += ByteUtils.SIZE_OF_BOOLEAN;
+  }
+
+  @Override
+  public void writeByte(int v) throws IOException {
+    ensureSize(ByteUtils.SIZE_OF_BYTE);
+    UNSAFE.putByte(buf, BYTE_ARRAY_OFFSET + pos, (byte) v);
+    pos += ByteUtils.SIZE_OF_BYTE;
+  }
+
+  @Override
+  public void writeShort(int v) throws IOException {
+    ensureSize(ByteUtils.SIZE_OF_SHORT);
+    UNSAFE.putShort(buf, BYTE_ARRAY_OFFSET + pos, (short) v);
+    pos += ByteUtils.SIZE_OF_SHORT;
+  }
+
+  @Override
+  public void writeChar(int v) throws IOException {
+    ensureSize(ByteUtils.SIZE_OF_CHAR);
+    UNSAFE.putChar(buf, BYTE_ARRAY_OFFSET + pos, (char) v);
+    pos += ByteUtils.SIZE_OF_CHAR;
+  }
+
+  @Override
+  public void writeInt(int v) throws IOException {
+    ensureSize(ByteUtils.SIZE_OF_INT);
+    UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, v);
+    pos += ByteUtils.SIZE_OF_INT;
+  }
+
+  @Override
+  public void ensureWritable(int minSize) {
+    if ((pos + minSize) > buf.length) {
+      buf = Arrays.copyOf(buf, Math.max(buf.length << 1, pos + minSize));
+    }
+  }
+
+  @Override
+  public void skipBytes(int bytesToSkip) {
+    ensureWritable(bytesToSkip);
+    pos += bytesToSkip;
+  }
+
+  @Override
+  public void writeInt(int pos, int value) {
+    if (pos + ByteUtils.SIZE_OF_INT > this.pos) {
+      throw new IndexOutOfBoundsException(
+          "writeInt: Tried to write int to position " + pos
+              + " but current length is " + this.pos);
+    }
+    UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, value);
+  }
+
+  @Override
+  public void writeLong(long v) throws IOException {
+    ensureSize(ByteUtils.SIZE_OF_LONG);
+    UNSAFE.putLong(buf, BYTE_ARRAY_OFFSET + pos, v);
+    pos += ByteUtils.SIZE_OF_LONG;
+  }
+
+  @Override
+  public void writeFloat(float v) throws IOException {
+    ensureSize(ByteUtils.SIZE_OF_FLOAT);
+    UNSAFE.putFloat(buf, BYTE_ARRAY_OFFSET + pos, v);
+    pos += ByteUtils.SIZE_OF_FLOAT;
+  }
+
+  @Override
+  public void writeDouble(double v) throws IOException {
+    ensureSize(ByteUtils.SIZE_OF_DOUBLE);
+    UNSAFE.putDouble(buf, BYTE_ARRAY_OFFSET + pos, v);
+    pos += ByteUtils.SIZE_OF_DOUBLE;
+  }
+
+  @Override
+  public void writeBytes(String s) throws IOException {
+    // Note that this code is mostly copied from DataOutputStream
+    int len = s.length();
+    ensureSize(len);
+    for (int i = 0; i < len; i++) {
+      int v = s.charAt(i);
+      writeByte(v);
+    }
+  }
+
+  @Override
+  public void writeChars(String s) throws IOException {
+    // Note that this code is mostly copied from DataOutputStream
+    int len = s.length();
+    ensureSize(len * ByteUtils.SIZE_OF_CHAR);
+    for (int i = 0; i < len; i++) {
+      int v = s.charAt(i);
+      writeChar(v);
+    }
+  }
+
+  @Override
+  public void writeUTF(String s) throws IOException {
+    // Note that this code is mostly copied from DataOutputStream
+    int strlen = s.length();
+    int utflen = 0;
+    int c;
+
+    /* use charAt instead of copying String to char array */
+    for (int i = 0; i < strlen; i++) {
+      c = s.charAt(i);
+      if ((c >= 0x0001) && (c <= 0x007F)) {
+        utflen++;
+      } else if (c > 0x07FF) {
+        utflen += 3;
+      } else {
+        utflen += 2;
+      }
+    }
+
+    ensureSize(utflen + ByteUtils.SIZE_OF_SHORT);
+    writeShort(utflen);
+
+    int i = 0;
+    for (i = 0; i < strlen; i++) {
+      c = s.charAt(i);
+      if (!((c >= 0x0001) && (c <= 0x007F))) {
+        break;
+      }
+      buf[pos++] = (byte) c;
+    }
+
+    for (; i < strlen; i++) {
+      c = s.charAt(i);
+      if ((c >= 0x0001) && (c <= 0x007F)) {
+        buf[pos++] = (byte) c;
+
+      } else if (c > 0x07FF) {
+        buf[pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+        buf[pos++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+        buf[pos++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+      } else {
+        buf[pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+        buf[pos++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+      }
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/hama/util/WritableUtils.java b/core/src/main/java/org/apache/hama/util/WritableUtils.java
index 55accef..f55be7d 100644
--- a/core/src/main/java/org/apache/hama/util/WritableUtils.java
+++ b/core/src/main/java/org/apache/hama/util/WritableUtils.java
@@ -47,4 +47,24 @@
       e.printStackTrace();
     }
   }
+
+  public static byte[] unsafeSerialize(Writable w) {
+    UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
+    DataOutput output = new DataOutputStream(out);
+    try {
+      w.write(output);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    return out.toByteArray();
+  }
+
+  public static void unsafeDeserialize(byte[] bytes, Writable obj) {
+    DataInputStream in = new DataInputStream(new UnsafeByteArrayInputStream(bytes));
+    try {
+      obj.readFields(in);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
 }
diff --git a/graph/src/main/java/org/apache/hama/graph/GraphJob.java b/graph/src/main/java/org/apache/hama/graph/GraphJob.java
index b5663d2..31741f4 100644
--- a/graph/src/main/java/org/apache/hama/graph/GraphJob.java
+++ b/graph/src/main/java/org/apache/hama/graph/GraphJob.java
@@ -60,6 +60,7 @@
         OutgoingVertexMessageManager.class, OutgoingMessageManager.class);
     conf.setBoolean(Constants.FORCE_SET_BSP_TASKS, true);
     conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, false);
+    conf.setBoolean("hama.use.unsafeserialization", true);
     
     this.setBspClass(GraphJobRunner.class);
     this.setJarByClass(exampleClass);
diff --git a/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java b/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
index e1f758b..7e2cc6d 100644
--- a/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
+++ b/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
@@ -49,6 +50,7 @@
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.commons.util.KeyValuePair;
 import org.apache.hama.util.ReflectionUtils;
+import org.apache.hama.util.UnsafeByteArrayInputStream;
 import org.apache.hama.util.WritableUtils;
 
 /**
@@ -112,7 +114,7 @@
   // global counter for thread exceptions
   // TODO find more graceful way to handle thread exceptions.
   private AtomicInteger errorCount = new AtomicInteger(0);
-  
+
   private AggregationRunner<V, E, M> aggregationRunner;
   private VertexOutputWriter<Writable, Writable, V, E, M> vertexOutputWriter;
   private Combiner<Writable> combiner;
@@ -121,7 +123,8 @@
 
   private RejectedExecutionHandler retryHandler = new RetryRejectedExecutionHandler();
 
-  // Below maps are used for grouping messages into single GraphJobMessage, based on vertex ID.
+  // Below maps are used for grouping messages into single GraphJobMessage,
+  // based on vertex ID.
   private final ConcurrentHashMap<Integer, GraphJobMessage> partitionMessages = new ConcurrentHashMap<Integer, GraphJobMessage>();
   private final ConcurrentHashMap<V, GraphJobMessage> vertexMessages = new ConcurrentHashMap<V, GraphJobMessage>();
 
@@ -259,7 +262,7 @@
 
     ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
         .newCachedThreadPool();
-    executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 256));
+    executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 64));
     executor.setRejectedExecutionHandler(retryHandler);
 
     long loopStartTime = System.currentTimeMillis();
@@ -319,7 +322,7 @@
 
     ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
         .newCachedThreadPool();
-    executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 256));
+    executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 64));
     executor.setRejectedExecutionHandler(retryHandler);
 
     for (V v : vertices.keySet()) {
@@ -337,7 +340,7 @@
       throw new IOException("there were " + errorCount
           + " exceptions during compute vertices.");
     }
-    
+
     getAggregationRunner().sendAggregatorValues(peer, 1, this.changedVertexCnt);
     iteration++;
     finishSuperstep();
@@ -455,10 +458,11 @@
 
     ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
         .newCachedThreadPool();
-    executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 256));
+    executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 64));
     executor.setRejectedExecutionHandler(retryHandler);
 
     KeyValuePair<Writable, Writable> next = null;
+
     while ((next = peer.readNext()) != null) {
       Vertex<V, E, M> vertex = GraphJobRunner
           .<V, E, M> newVertexInstance(VERTEX_CLASS);
@@ -496,7 +500,7 @@
     peer.sync();
 
     executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
-    executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 256));
+    executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 64));
     executor.setRejectedExecutionHandler(retryHandler);
 
     GraphJobMessage msg;
@@ -695,26 +699,50 @@
     vertices.finishAdditions();
   }
 
-  public void sendMessage(V vertexID, byte[] msg) throws IOException {
+  public void sendMessage(V vertexID, M msg) throws IOException {
     if (!vertexMessages.containsKey(vertexID)) {
-      // To save bit memory we don't set vertexID twice
       vertexMessages.putIfAbsent(vertexID, new GraphJobMessage());
     }
-    vertexMessages.get(vertexID).add(msg);
+    if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
+      vertexMessages.get(vertexID).add(WritableUtils.serialize(msg));
+    } else {
+      vertexMessages.get(vertexID).add(WritableUtils.unsafeSerialize(msg));
+    }
+  }
+
+  public void sendMessage(List<Edge<V, E>> outEdges, M msg) throws IOException {
+    byte[] serialized;
+    if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
+      serialized = WritableUtils.serialize(msg);
+    } else {
+      serialized = WritableUtils.unsafeSerialize(msg);
+    }
+
+    for (Edge<V, E> e : outEdges) {
+      if (!vertexMessages.containsKey(e.getDestinationVertexID())) {
+        vertexMessages.putIfAbsent(e.getDestinationVertexID(),
+            new GraphJobMessage());
+      }
+
+      vertexMessages.get(e.getDestinationVertexID()).add(serialized);
+    }
   }
 
   public void finishSuperstep() throws IOException {
     vertices.finishSuperstep();
 
-    Iterator<Entry<V, GraphJobMessage>> it = vertexMessages.entrySet().iterator();
+    Iterator<Entry<V, GraphJobMessage>> it = vertexMessages.entrySet()
+        .iterator();
     while (it.hasNext()) {
       Entry<V, GraphJobMessage> e = it.next();
       it.remove();
 
       if (combiner != null && e.getValue().getNumOfValues() > 1) {
-        GraphJobMessage combined = new GraphJobMessage(e.getKey(),
+        GraphJobMessage combined;
+        combined = new GraphJobMessage(e.getKey(),
             WritableUtils.serialize(combiner.combine(getIterableMessages(e
                 .getValue().getValuesBytes(), e.getValue().getNumOfValues()))));
+
         combined.setFlag(GraphJobMessage.VERTEX_FLAG);
         peer.send(getHostName(e.getKey()), combined);
       } else {
@@ -732,13 +760,19 @@
 
   public Iterable<Writable> getIterableMessages(final byte[] valuesBytes,
       final int numOfValues) {
-
+    
     return new Iterable<Writable>() {
+      DataInputStream dis;
+      
       @Override
       public Iterator<Writable> iterator() {
+        if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
+          dis = new DataInputStream(new ByteArrayInputStream(valuesBytes));
+        } else {
+          dis = new DataInputStream(new UnsafeByteArrayInputStream(valuesBytes));
+        }
+        
         return new Iterator<Writable>() {
-          ByteArrayInputStream bis = new ByteArrayInputStream(valuesBytes);
-          DataInputStream dis = new DataInputStream(bis);
           int index = 0;
 
           @Override
diff --git a/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java b/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
index b84cd96..730e07a 100644
--- a/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
+++ b/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
@@ -44,26 +44,36 @@
   private final ConcurrentHashMap<V, byte[]> vertices = new ConcurrentHashMap<V, byte[]>();
 
   private GraphJobRunner<V, E, M> runner;
-
+  private HamaConfiguration conf;
   private AtomicInteger activeVertices = new AtomicInteger(0);
 
   @Override
   public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
       TaskAttemptID attempt) throws IOException {
     this.runner = runner;
+    this.conf = conf;
   }
 
   @Override
   public void put(Vertex<V, E, M> vertex) throws IOException {
     if (!vertices.containsKey(vertex.getVertexID())) {
-      vertices.putIfAbsent(vertex.getVertexID(),
-          WritableUtils.serialize(vertex));
+      if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
+        vertices.putIfAbsent(vertex.getVertexID(),
+            WritableUtils.serialize(vertex));
+      } else {
+        vertices.putIfAbsent(vertex.getVertexID(),
+            WritableUtils.unsafeSerialize(vertex));
+      }
     } else {
       Vertex<V, E, M> v = this.get(vertex.getVertexID());
       for (Edge<V, E> e : vertex.getEdges()) {
         v.addEdge(e);
       }
-      vertices.put(vertex.getVertexID(), WritableUtils.serialize(v));
+      if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
+        vertices.put(vertex.getVertexID(), WritableUtils.serialize(v));
+      } else {
+        vertices.put(vertex.getVertexID(), WritableUtils.unsafeSerialize(v));
+      }
     }
   }
 
@@ -85,7 +95,11 @@
   public Vertex<V, E, M> get(V vertexID) throws IOException {
     Vertex<V, E, M> v = GraphJobRunner
         .<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
-    WritableUtils.deserialize(vertices.get(vertexID), v);
+    if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
+      WritableUtils.deserialize(vertices.get(vertexID), v);
+    } else {
+      WritableUtils.unsafeDeserialize(vertices.get(vertexID), v);
+    }
     v.setRunner(runner);
 
     return v;
@@ -107,7 +121,13 @@
       public Vertex<V, E, M> next() {
         Vertex<V, E, M> v = GraphJobRunner
             .<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
-        WritableUtils.deserialize(it.next(), v);
+
+        if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
+          WritableUtils.deserialize(it.next(), v);
+        } else {
+          WritableUtils.unsafeDeserialize(it.next(), v);
+        }
+
         v.setRunner(runner);
         return v;
       }
@@ -130,7 +150,11 @@
       throws IOException {
     incrementCount();
     vertex.setComputed();
-    vertices.put(vertex.getVertexID(), WritableUtils.serialize(vertex));
+    if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
+      vertices.put(vertex.getVertexID(), WritableUtils.serialize(vertex));
+    } else {
+      vertices.put(vertex.getVertexID(), WritableUtils.unsafeSerialize(vertex));
+    }
   }
 
   public void incrementCount() {
diff --git a/graph/src/main/java/org/apache/hama/graph/Vertex.java b/graph/src/main/java/org/apache/hama/graph/Vertex.java
index 4d441f4..bf90187 100644
--- a/graph/src/main/java/org/apache/hama/graph/Vertex.java
+++ b/graph/src/main/java/org/apache/hama/graph/Vertex.java
@@ -30,7 +30,6 @@
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.Counters.Counter;
-import org.apache.hama.util.WritableUtils;
 
 /**
  * Vertex is a abstract definition of Google Pregel Vertex. For implementing a
@@ -61,7 +60,7 @@
 
   private boolean votedToHalt = false;
   private long lastComputedSuperstep = 0;
-  
+
   public HamaConfiguration getConf() {
     return runner.getPeer().getConfiguration();
   }
@@ -80,22 +79,17 @@
 
   @Override
   public void sendMessage(Edge<V, E> e, M msg) throws IOException {
-    runner.sendMessage(e.getDestinationVertexID(),
-        WritableUtils.serialize(msg));
+    runner.sendMessage(e.getDestinationVertexID(), msg);
   }
 
   @Override
   public void sendMessage(V destinationVertexID, M msg) throws IOException {
-    runner.sendMessage(destinationVertexID, WritableUtils.serialize(msg));
+    runner.sendMessage(destinationVertexID, msg);
   }
 
   @Override
   public void sendMessageToNeighbors(M msg) throws IOException {
-    final List<Edge<V, E>> outEdges = this.getEdges();
-    byte[] serialized = WritableUtils.serialize(msg);
-    for (Edge<V, E> e : outEdges) {
-      runner.sendMessage(e.getDestinationVertexID(), serialized);
-    }
+    runner.sendMessage(this.getEdges(), msg);
   }
 
   private void alterVertexCounter(int i) throws IOException {
@@ -204,11 +198,11 @@
   void setComputed() {
     this.lastComputedSuperstep = this.getSuperstepCount();
   }
-  
+
   public boolean isComputed() {
     return (lastComputedSuperstep == this.getSuperstepCount()) ? true : false;
   }
-  
+
   void setVotedToHalt(boolean votedToHalt) {
     this.votedToHalt = votedToHalt;
   }
@@ -257,7 +251,7 @@
     }
 
     this.lastComputedSuperstep = in.readLong();
-    
+
     this.edges = new ArrayList<Edge<V, E>>();
     if (in.readBoolean()) {
       int num = in.readInt();
@@ -288,16 +282,16 @@
       out.writeBoolean(true);
       vertexID.write(out);
     }
-    
+
     if (value == null) {
       out.writeBoolean(false);
     } else {
       out.writeBoolean(true);
       value.write(out);
     }
-    
+
     out.writeLong(lastComputedSuperstep);
-    
+
     if (this.edges == null) {
       out.writeBoolean(false);
     } else {