HAMA-955: Support UnsafeByteArrayInputStream and UnSafeByteArrayOutputStream
git-svn-id: https://svn.apache.org/repos/asf/hama/trunk@1682806 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 8759b6d..e914bb1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,7 +25,8 @@
HAMA-885: Semi-Clustering is not producing expected output (Renil J via edwardyoon)
IMPROVEMENTS
-
+
+ HAMA-955: Support UnsafeByteArrayInputStream and UnSafeByteArrayOutputStream (Minho Kim via edwardyoon)
HAMA-944: Add JSON format option to fastgen command (Minho Kim via edwardyoon)
HAMA-919: Manage messages per Vertex (edwardyoon)
HAMA-923: add a toString() method for FloatArrayWritable and TextArrayWritable classes (edwardyoon)
diff --git a/core/src/main/java/org/apache/hama/util/ByteUtils.java b/core/src/main/java/org/apache/hama/util/ByteUtils.java
new file mode 100644
index 0000000..7b69854
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/util/ByteUtils.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+/**
+ * Utilities class for byte operations and constants
+ */
+public class ByteUtils {
+ /** Bytes used in a boolean */
+ public static final int SIZE_OF_BOOLEAN = 1;
+ /** Bytes used in a byte */
+ public static final int SIZE_OF_BYTE = 1;
+ /** Bytes used in a char */
+ public static final int SIZE_OF_CHAR = Character.SIZE / Byte.SIZE;
+ /** Bytes used in a short */
+ public static final int SIZE_OF_SHORT = Short.SIZE / Byte.SIZE;
+ /** Bytes used in an int */
+ public static final int SIZE_OF_INT = Integer.SIZE / Byte.SIZE;
+ /** Bytes used in a long */
+ public static final int SIZE_OF_LONG = Long.SIZE / Byte.SIZE;
+ /** Bytes used in a float */
+ public static final int SIZE_OF_FLOAT = Float.SIZE / Byte.SIZE;
+ /** Bytes used in a double */
+ public static final int SIZE_OF_DOUBLE = Double.SIZE / Byte.SIZE;
+}
diff --git a/core/src/main/java/org/apache/hama/util/ExtendedDataOutput.java b/core/src/main/java/org/apache/hama/util/ExtendedDataOutput.java
new file mode 100644
index 0000000..44bee8a
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/util/ExtendedDataOutput.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.DataOutput;
+
+/**
+ * Add a few features to data output
+ */
+public interface ExtendedDataOutput extends DataOutput {
+ /**
+ * Ensure that backing byte structure has at least minSize
+ * additional bytes
+ *
+ * @param minSize additional size required
+ */
+ void ensureWritable(int minSize);
+
+ /**
+ * Skip some number of bytes.
+ *
+ * @param bytesToSkip Number of bytes to skip
+ */
+ void skipBytes(int bytesToSkip);
+
+ /**
+ * In order to write a size as a first part of an data output, it is
+ * useful to be able to write an int at an arbitrary location in the stream
+ *
+ * @param pos Byte position in the output stream
+ * @param value Value to write
+ */
+ void writeInt(int pos, int value);
+
+ /**
+ * Get the position in the output stream
+ *
+ * @return Position in the output stream
+ */
+ int getPos();
+
+ /**
+ * Get the internal byte array (if possible), read-only
+ *
+ * @return Internal byte array (do not modify)
+ */
+ byte[] getByteArray();
+
+ /**
+ * Copies the internal byte array
+ *
+ * @return Copied byte array
+ */
+ byte[] toByteArray();
+
+ /**
+ * Return a copy of slice of byte array
+ *
+ * @param offset offset of array
+ * @param length length of slice
+ * @return byte array
+ */
+ byte[] toByteArray(int offset, int length);
+
+ /**
+ * Clears the buffer
+ */
+ void reset();
+}
diff --git a/core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java b/core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java
new file mode 100644
index 0000000..da05230
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+
+/**
+ * This class, much like {@link ByteArrayInputStream} uses a given buffer as a
+ * source of an InputStream. Unlike ByteArrayInputStream, this class does not
+ * "waste" memory by creating a local copy of the given buffer, but rather uses
+ * the given buffer as is. Hence the name Unsafe. While using this class one
+ * should remember that the byte[] buffer memory is shared and might be changed
+ * from outside.
+ *
+ * For reuse-ability, a call for {@link #reInit(byte[])} can be called, and
+ * initialize the stream with a new buffer.
+ *
+ */
+public class UnsafeByteArrayInputStream extends InputStream {
+ private byte[] buffer;
+ private int markIndex;
+ private int upperLimit;
+ private int index;
+
+ /**
+ * Creates a new instance by not using any byte[] up front. If you use this
+ * constructor, you MUST call either of the {@link #reInit(byte[]) reInit}
+ * methods before you consume any byte from this instance.<br>
+ * This constructor is for convenience purposes only, so that if one does not
+ * have the byte[] at the moment of creation, one is not forced to pass a
+ * <code>new byte[0]</code> or something. Obviously in that case, one will
+ * call either {@link #reInit(byte[]) reInit} methods before using the class.
+ */
+ public UnsafeByteArrayInputStream() {
+ markIndex = upperLimit = index = 0;
+ }
+
+ /**
+ * Creates an UnsafeByteArrayInputStream which uses a given byte array as the
+ * source of the stream. Default range is [0 , buffer.length)
+ *
+ * @param buffer byte array used as the source of this stream
+ */
+ public UnsafeByteArrayInputStream(byte[] buffer) {
+ reInit(buffer, 0, buffer.length);
+ }
+
+ /**
+ * Creates an UnsafeByteArrayInputStream which uses a given byte array as the
+ * source of the stream, at the specific range: [startPos, endPos)
+ *
+ * @param buffer byte array used as the source of this stream
+ * @param startPos first index (inclusive) to the data lying in the given
+ * buffer
+ * @param endPos an index (exclusive) where the data ends. data @
+ * buffer[endPos] will never be read
+ */
+ public UnsafeByteArrayInputStream(byte[] buffer, int startPos, int endPos) {
+ reInit(buffer, startPos, endPos);
+ }
+
+ @Override
+ public void mark(int readlimit) {
+ markIndex = index;
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ /**
+ * Initialize the stream with a given buffer, using the default limits of [0,
+ * buffer.length)
+ *
+ * @param buffer byte array used as the source of this stream
+ */
+ public void reInit(byte[] buffer) {
+ reInit(buffer, 0, buffer.length);
+ }
+
+ /**
+ * Initialize the stream with a given byte array as the source of the stream,
+ * at the specific range: [startPos, endPos)
+ *
+ * @param buffer byte array used as the source of this stream
+ * @param startPos first index (inclusive) to the data lying in the given
+ * buffer
+ * @param endPos an index (exclusive) where the data ends. data @
+ * buffer[endPos] will never be read
+ */
+ public void reInit(byte[] buffer, int startPos, int endPos) {
+ this.buffer = buffer;
+ markIndex = startPos;
+ upperLimit = endPos;
+ index = markIndex;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return upperLimit - index;
+ }
+
+ /**
+ * Read a byte. Data returned as an integer [0,255] If end of stream reached,
+ * returns -1
+ */
+ @Override
+ public int read() throws IOException {
+ return index < upperLimit ? buffer[index++] & 0xff : -1;
+ }
+
+ /**
+ * Resets the stream back to its original state. Basically - moving the index
+ * back to start position.
+ */
+ @Override
+ public void reset() throws IOException {
+ index = markIndex;
+ }
+}
diff --git a/core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java b/core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java
new file mode 100644
index 0000000..c590a50
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+/**
+ * Byte array output stream that uses Unsafe methods to serialize/deserialize
+ * much faster
+ */
+@SuppressWarnings("restriction")
+public class UnsafeByteArrayOutputStream extends OutputStream implements
+ ExtendedDataOutput {
+ static {
+ try {
+ Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+ field.setAccessible(true);
+ UNSAFE = (sun.misc.Unsafe) field.get(null);
+ // Checkstyle exception due to needing to check if unsafe is allowed
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (Exception e) {
+ // CHECKSTYLE: resume IllegalCatch
+ throw new RuntimeException("UnsafeByteArrayOutputStream: Failed to "
+ + "get unsafe", e);
+ }
+ }
+
+ /** Default number of bytes */
+ private static final int DEFAULT_BYTES = 32;
+ /** Access to the unsafe class */
+ private static final sun.misc.Unsafe UNSAFE;
+
+ /** Offset of a byte array */
+ private static final long BYTE_ARRAY_OFFSET = UNSAFE
+ .arrayBaseOffset(byte[].class);
+
+ /** Byte buffer */
+ private byte[] buf;
+ /** Position in the buffer */
+ private int pos = 0;
+
+ /**
+ * Constructor
+ */
+ public UnsafeByteArrayOutputStream() {
+ this(DEFAULT_BYTES);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param size Initial size of the underlying byte array
+ */
+ public UnsafeByteArrayOutputStream(int size) {
+ buf = new byte[size];
+ }
+
+ /**
+ * Constructor to take in a buffer
+ *
+ * @param buf Buffer to start with, or if null, create own buffer
+ */
+ public UnsafeByteArrayOutputStream(byte[] buf) {
+ if (buf == null) {
+ this.buf = new byte[DEFAULT_BYTES];
+ } else {
+ this.buf = buf;
+ }
+ }
+
+ /**
+ * Constructor to take in a buffer with a given position into that buffer
+ *
+ * @param buf Buffer to start with
+ * @param pos Position to write at the buffer
+ */
+ public UnsafeByteArrayOutputStream(byte[] buf, int pos) {
+ this(buf);
+ this.pos = pos;
+ }
+
+ /**
+ * Ensure that this buffer has enough remaining space to add the size. Creates
+ * and copies to a new buffer if necessary
+ *
+ * @param size Size to add
+ */
+ private void ensureSize(int size) {
+ if (pos + size > buf.length) {
+ byte[] newBuf = new byte[(buf.length + size) << 1];
+ System.arraycopy(buf, 0, newBuf, 0, pos);
+ buf = newBuf;
+ }
+ }
+
+ @Override
+ public byte[] getByteArray() {
+ return buf;
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ return Arrays.copyOf(buf, pos);
+ }
+
+ @Override
+ public byte[] toByteArray(int offset, int length) {
+ if (offset + length > pos) {
+ throw new IndexOutOfBoundsException(String.format("Offset: %d + "
+ + "Length: %d exceeds the size of buf : %d", offset, length, pos));
+ }
+ return Arrays.copyOfRange(buf, offset, length);
+ }
+
+ @Override
+ public void reset() {
+ pos = 0;
+ }
+
+ @Override
+ public int getPos() {
+ return pos;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ ensureSize(ByteUtils.SIZE_OF_BYTE);
+ buf[pos] = (byte) b;
+ pos += ByteUtils.SIZE_OF_BYTE;
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ ensureSize(b.length);
+ System.arraycopy(b, 0, buf, pos, b.length);
+ pos += b.length;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ ensureSize(len);
+ System.arraycopy(b, off, buf, pos, len);
+ pos += len;
+ }
+
+ @Override
+ public void writeBoolean(boolean v) throws IOException {
+ ensureSize(ByteUtils.SIZE_OF_BOOLEAN);
+ UNSAFE.putBoolean(buf, BYTE_ARRAY_OFFSET + pos, v);
+ pos += ByteUtils.SIZE_OF_BOOLEAN;
+ }
+
+ @Override
+ public void writeByte(int v) throws IOException {
+ ensureSize(ByteUtils.SIZE_OF_BYTE);
+ UNSAFE.putByte(buf, BYTE_ARRAY_OFFSET + pos, (byte) v);
+ pos += ByteUtils.SIZE_OF_BYTE;
+ }
+
+ @Override
+ public void writeShort(int v) throws IOException {
+ ensureSize(ByteUtils.SIZE_OF_SHORT);
+ UNSAFE.putShort(buf, BYTE_ARRAY_OFFSET + pos, (short) v);
+ pos += ByteUtils.SIZE_OF_SHORT;
+ }
+
+ @Override
+ public void writeChar(int v) throws IOException {
+ ensureSize(ByteUtils.SIZE_OF_CHAR);
+ UNSAFE.putChar(buf, BYTE_ARRAY_OFFSET + pos, (char) v);
+ pos += ByteUtils.SIZE_OF_CHAR;
+ }
+
+ @Override
+ public void writeInt(int v) throws IOException {
+ ensureSize(ByteUtils.SIZE_OF_INT);
+ UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, v);
+ pos += ByteUtils.SIZE_OF_INT;
+ }
+
+ @Override
+ public void ensureWritable(int minSize) {
+ if ((pos + minSize) > buf.length) {
+ buf = Arrays.copyOf(buf, Math.max(buf.length << 1, pos + minSize));
+ }
+ }
+
+ @Override
+ public void skipBytes(int bytesToSkip) {
+ ensureWritable(bytesToSkip);
+ pos += bytesToSkip;
+ }
+
+ @Override
+ public void writeInt(int pos, int value) {
+ if (pos + ByteUtils.SIZE_OF_INT > this.pos) {
+ throw new IndexOutOfBoundsException(
+ "writeInt: Tried to write int to position " + pos
+ + " but current length is " + this.pos);
+ }
+ UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, value);
+ }
+
+ @Override
+ public void writeLong(long v) throws IOException {
+ ensureSize(ByteUtils.SIZE_OF_LONG);
+ UNSAFE.putLong(buf, BYTE_ARRAY_OFFSET + pos, v);
+ pos += ByteUtils.SIZE_OF_LONG;
+ }
+
+ @Override
+ public void writeFloat(float v) throws IOException {
+ ensureSize(ByteUtils.SIZE_OF_FLOAT);
+ UNSAFE.putFloat(buf, BYTE_ARRAY_OFFSET + pos, v);
+ pos += ByteUtils.SIZE_OF_FLOAT;
+ }
+
+ @Override
+ public void writeDouble(double v) throws IOException {
+ ensureSize(ByteUtils.SIZE_OF_DOUBLE);
+ UNSAFE.putDouble(buf, BYTE_ARRAY_OFFSET + pos, v);
+ pos += ByteUtils.SIZE_OF_DOUBLE;
+ }
+
+ @Override
+ public void writeBytes(String s) throws IOException {
+ // Note that this code is mostly copied from DataOutputStream
+ int len = s.length();
+ ensureSize(len);
+ for (int i = 0; i < len; i++) {
+ int v = s.charAt(i);
+ writeByte(v);
+ }
+ }
+
+ @Override
+ public void writeChars(String s) throws IOException {
+ // Note that this code is mostly copied from DataOutputStream
+ int len = s.length();
+ ensureSize(len * ByteUtils.SIZE_OF_CHAR);
+ for (int i = 0; i < len; i++) {
+ int v = s.charAt(i);
+ writeChar(v);
+ }
+ }
+
+ @Override
+ public void writeUTF(String s) throws IOException {
+ // Note that this code is mostly copied from DataOutputStream
+ int strlen = s.length();
+ int utflen = 0;
+ int c;
+
+ /* use charAt instead of copying String to char array */
+ for (int i = 0; i < strlen; i++) {
+ c = s.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ utflen++;
+ } else if (c > 0x07FF) {
+ utflen += 3;
+ } else {
+ utflen += 2;
+ }
+ }
+
+ ensureSize(utflen + ByteUtils.SIZE_OF_SHORT);
+ writeShort(utflen);
+
+ int i = 0;
+ for (i = 0; i < strlen; i++) {
+ c = s.charAt(i);
+ if (!((c >= 0x0001) && (c <= 0x007F))) {
+ break;
+ }
+ buf[pos++] = (byte) c;
+ }
+
+ for (; i < strlen; i++) {
+ c = s.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ buf[pos++] = (byte) c;
+
+ } else if (c > 0x07FF) {
+ buf[pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+ buf[pos++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+ buf[pos++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+ } else {
+ buf[pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+ buf[pos++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/hama/util/WritableUtils.java b/core/src/main/java/org/apache/hama/util/WritableUtils.java
index 55accef..f55be7d 100644
--- a/core/src/main/java/org/apache/hama/util/WritableUtils.java
+++ b/core/src/main/java/org/apache/hama/util/WritableUtils.java
@@ -47,4 +47,24 @@
e.printStackTrace();
}
}
+
+ public static byte[] unsafeSerialize(Writable w) {
+ UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
+ DataOutput output = new DataOutputStream(out);
+ try {
+ w.write(output);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return out.toByteArray();
+ }
+
+ public static void unsafeDeserialize(byte[] bytes, Writable obj) {
+ DataInputStream in = new DataInputStream(new UnsafeByteArrayInputStream(bytes));
+ try {
+ obj.readFields(in);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
}
diff --git a/graph/src/main/java/org/apache/hama/graph/GraphJob.java b/graph/src/main/java/org/apache/hama/graph/GraphJob.java
index b5663d2..31741f4 100644
--- a/graph/src/main/java/org/apache/hama/graph/GraphJob.java
+++ b/graph/src/main/java/org/apache/hama/graph/GraphJob.java
@@ -60,6 +60,7 @@
OutgoingVertexMessageManager.class, OutgoingMessageManager.class);
conf.setBoolean(Constants.FORCE_SET_BSP_TASKS, true);
conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, false);
+ conf.setBoolean("hama.use.unsafeserialization", true);
this.setBspClass(GraphJobRunner.class);
this.setJarByClass(exampleClass);
diff --git a/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java b/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
index e1f758b..7e2cc6d 100644
--- a/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
+++ b/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@@ -49,6 +50,7 @@
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.commons.util.KeyValuePair;
import org.apache.hama.util.ReflectionUtils;
+import org.apache.hama.util.UnsafeByteArrayInputStream;
import org.apache.hama.util.WritableUtils;
/**
@@ -112,7 +114,7 @@
// global counter for thread exceptions
// TODO find more graceful way to handle thread exceptions.
private AtomicInteger errorCount = new AtomicInteger(0);
-
+
private AggregationRunner<V, E, M> aggregationRunner;
private VertexOutputWriter<Writable, Writable, V, E, M> vertexOutputWriter;
private Combiner<Writable> combiner;
@@ -121,7 +123,8 @@
private RejectedExecutionHandler retryHandler = new RetryRejectedExecutionHandler();
- // Below maps are used for grouping messages into single GraphJobMessage, based on vertex ID.
+ // Below maps are used for grouping messages into single GraphJobMessage,
+ // based on vertex ID.
private final ConcurrentHashMap<Integer, GraphJobMessage> partitionMessages = new ConcurrentHashMap<Integer, GraphJobMessage>();
private final ConcurrentHashMap<V, GraphJobMessage> vertexMessages = new ConcurrentHashMap<V, GraphJobMessage>();
@@ -259,7 +262,7 @@
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
.newCachedThreadPool();
- executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 256));
+ executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 64));
executor.setRejectedExecutionHandler(retryHandler);
long loopStartTime = System.currentTimeMillis();
@@ -319,7 +322,7 @@
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
.newCachedThreadPool();
- executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 256));
+ executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 64));
executor.setRejectedExecutionHandler(retryHandler);
for (V v : vertices.keySet()) {
@@ -337,7 +340,7 @@
throw new IOException("there were " + errorCount
+ " exceptions during compute vertices.");
}
-
+
getAggregationRunner().sendAggregatorValues(peer, 1, this.changedVertexCnt);
iteration++;
finishSuperstep();
@@ -455,10 +458,11 @@
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
.newCachedThreadPool();
- executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 256));
+ executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 64));
executor.setRejectedExecutionHandler(retryHandler);
KeyValuePair<Writable, Writable> next = null;
+
while ((next = peer.readNext()) != null) {
Vertex<V, E, M> vertex = GraphJobRunner
.<V, E, M> newVertexInstance(VERTEX_CLASS);
@@ -496,7 +500,7 @@
peer.sync();
executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
- executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 256));
+ executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 64));
executor.setRejectedExecutionHandler(retryHandler);
GraphJobMessage msg;
@@ -695,26 +699,50 @@
vertices.finishAdditions();
}
- public void sendMessage(V vertexID, byte[] msg) throws IOException {
+ public void sendMessage(V vertexID, M msg) throws IOException {
if (!vertexMessages.containsKey(vertexID)) {
- // To save bit memory we don't set vertexID twice
vertexMessages.putIfAbsent(vertexID, new GraphJobMessage());
}
- vertexMessages.get(vertexID).add(msg);
+ if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
+ vertexMessages.get(vertexID).add(WritableUtils.serialize(msg));
+ } else {
+ vertexMessages.get(vertexID).add(WritableUtils.unsafeSerialize(msg));
+ }
+ }
+
+ public void sendMessage(List<Edge<V, E>> outEdges, M msg) throws IOException {
+ byte[] serialized;
+ if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
+ serialized = WritableUtils.serialize(msg);
+ } else {
+ serialized = WritableUtils.unsafeSerialize(msg);
+ }
+
+ for (Edge<V, E> e : outEdges) {
+ if (!vertexMessages.containsKey(e.getDestinationVertexID())) {
+ vertexMessages.putIfAbsent(e.getDestinationVertexID(),
+ new GraphJobMessage());
+ }
+
+ vertexMessages.get(e.getDestinationVertexID()).add(serialized);
+ }
}
public void finishSuperstep() throws IOException {
vertices.finishSuperstep();
- Iterator<Entry<V, GraphJobMessage>> it = vertexMessages.entrySet().iterator();
+ Iterator<Entry<V, GraphJobMessage>> it = vertexMessages.entrySet()
+ .iterator();
while (it.hasNext()) {
Entry<V, GraphJobMessage> e = it.next();
it.remove();
if (combiner != null && e.getValue().getNumOfValues() > 1) {
- GraphJobMessage combined = new GraphJobMessage(e.getKey(),
+ GraphJobMessage combined;
+ combined = new GraphJobMessage(e.getKey(),
WritableUtils.serialize(combiner.combine(getIterableMessages(e
.getValue().getValuesBytes(), e.getValue().getNumOfValues()))));
+
combined.setFlag(GraphJobMessage.VERTEX_FLAG);
peer.send(getHostName(e.getKey()), combined);
} else {
@@ -732,13 +760,19 @@
public Iterable<Writable> getIterableMessages(final byte[] valuesBytes,
final int numOfValues) {
-
+
return new Iterable<Writable>() {
+ DataInputStream dis;
+
@Override
public Iterator<Writable> iterator() {
+ if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
+ dis = new DataInputStream(new ByteArrayInputStream(valuesBytes));
+ } else {
+ dis = new DataInputStream(new UnsafeByteArrayInputStream(valuesBytes));
+ }
+
return new Iterator<Writable>() {
- ByteArrayInputStream bis = new ByteArrayInputStream(valuesBytes);
- DataInputStream dis = new DataInputStream(bis);
int index = 0;
@Override
diff --git a/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java b/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
index b84cd96..730e07a 100644
--- a/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
+++ b/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
@@ -44,26 +44,36 @@
private final ConcurrentHashMap<V, byte[]> vertices = new ConcurrentHashMap<V, byte[]>();
private GraphJobRunner<V, E, M> runner;
-
+ private HamaConfiguration conf;
private AtomicInteger activeVertices = new AtomicInteger(0);
@Override
public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
TaskAttemptID attempt) throws IOException {
this.runner = runner;
+ this.conf = conf;
}
@Override
public void put(Vertex<V, E, M> vertex) throws IOException {
if (!vertices.containsKey(vertex.getVertexID())) {
- vertices.putIfAbsent(vertex.getVertexID(),
- WritableUtils.serialize(vertex));
+ if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
+ vertices.putIfAbsent(vertex.getVertexID(),
+ WritableUtils.serialize(vertex));
+ } else {
+ vertices.putIfAbsent(vertex.getVertexID(),
+ WritableUtils.unsafeSerialize(vertex));
+ }
} else {
Vertex<V, E, M> v = this.get(vertex.getVertexID());
for (Edge<V, E> e : vertex.getEdges()) {
v.addEdge(e);
}
- vertices.put(vertex.getVertexID(), WritableUtils.serialize(v));
+ if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
+ vertices.put(vertex.getVertexID(), WritableUtils.serialize(v));
+ } else {
+ vertices.put(vertex.getVertexID(), WritableUtils.unsafeSerialize(v));
+ }
}
}
@@ -85,7 +95,11 @@
public Vertex<V, E, M> get(V vertexID) throws IOException {
Vertex<V, E, M> v = GraphJobRunner
.<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
- WritableUtils.deserialize(vertices.get(vertexID), v);
+ if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
+ WritableUtils.deserialize(vertices.get(vertexID), v);
+ } else {
+ WritableUtils.unsafeDeserialize(vertices.get(vertexID), v);
+ }
v.setRunner(runner);
return v;
@@ -107,7 +121,13 @@
public Vertex<V, E, M> next() {
Vertex<V, E, M> v = GraphJobRunner
.<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
- WritableUtils.deserialize(it.next(), v);
+
+ if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
+ WritableUtils.deserialize(it.next(), v);
+ } else {
+ WritableUtils.unsafeDeserialize(it.next(), v);
+ }
+
v.setRunner(runner);
return v;
}
@@ -130,7 +150,11 @@
throws IOException {
incrementCount();
vertex.setComputed();
- vertices.put(vertex.getVertexID(), WritableUtils.serialize(vertex));
+ if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
+ vertices.put(vertex.getVertexID(), WritableUtils.serialize(vertex));
+ } else {
+ vertices.put(vertex.getVertexID(), WritableUtils.unsafeSerialize(vertex));
+ }
}
public void incrementCount() {
diff --git a/graph/src/main/java/org/apache/hama/graph/Vertex.java b/graph/src/main/java/org/apache/hama/graph/Vertex.java
index 4d441f4..bf90187 100644
--- a/graph/src/main/java/org/apache/hama/graph/Vertex.java
+++ b/graph/src/main/java/org/apache/hama/graph/Vertex.java
@@ -30,7 +30,6 @@
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.Counters.Counter;
-import org.apache.hama.util.WritableUtils;
/**
* Vertex is a abstract definition of Google Pregel Vertex. For implementing a
@@ -61,7 +60,7 @@
private boolean votedToHalt = false;
private long lastComputedSuperstep = 0;
-
+
public HamaConfiguration getConf() {
return runner.getPeer().getConfiguration();
}
@@ -80,22 +79,17 @@
@Override
public void sendMessage(Edge<V, E> e, M msg) throws IOException {
- runner.sendMessage(e.getDestinationVertexID(),
- WritableUtils.serialize(msg));
+ runner.sendMessage(e.getDestinationVertexID(), msg);
}
@Override
public void sendMessage(V destinationVertexID, M msg) throws IOException {
- runner.sendMessage(destinationVertexID, WritableUtils.serialize(msg));
+ runner.sendMessage(destinationVertexID, msg);
}
@Override
public void sendMessageToNeighbors(M msg) throws IOException {
- final List<Edge<V, E>> outEdges = this.getEdges();
- byte[] serialized = WritableUtils.serialize(msg);
- for (Edge<V, E> e : outEdges) {
- runner.sendMessage(e.getDestinationVertexID(), serialized);
- }
+ runner.sendMessage(this.getEdges(), msg);
}
private void alterVertexCounter(int i) throws IOException {
@@ -204,11 +198,11 @@
void setComputed() {
this.lastComputedSuperstep = this.getSuperstepCount();
}
-
+
public boolean isComputed() {
return (lastComputedSuperstep == this.getSuperstepCount()) ? true : false;
}
-
+
void setVotedToHalt(boolean votedToHalt) {
this.votedToHalt = votedToHalt;
}
@@ -257,7 +251,7 @@
}
this.lastComputedSuperstep = in.readLong();
-
+
this.edges = new ArrayList<Edge<V, E>>();
if (in.readBoolean()) {
int num = in.readInt();
@@ -288,16 +282,16 @@
out.writeBoolean(true);
vertexID.write(out);
}
-
+
if (value == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
value.write(out);
}
-
+
out.writeLong(lastComputedSuperstep);
-
+
if (this.edges == null) {
out.writeBoolean(false);
} else {