GIRAPH-1181
closes #65
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java
index e4f782e..20ce426 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java
@@ -19,8 +19,8 @@
package org.apache.giraph.comm.aggregators;
import java.io.IOException;
-import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
/**
* Wrapper for output stream which keeps the place in the beginning for the
@@ -36,7 +36,7 @@
* Default constructor
*/
public CountingOutputStream() {
- dataOutput = new ExtendedByteArrayDataOutput();
+ dataOutput = new UnsafeByteArrayOutputStream();
reset();
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java
index 7107228..d372bf2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java
@@ -18,6 +18,8 @@
package org.apache.giraph.comm.requests;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
+
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
@@ -64,6 +66,14 @@
return new DataInputStream(new ByteArrayInputStream(data));
}
+ /**
+ * Wraps the byte array with UnsafeByteArrayInputStream stream.
+ * @return UnsafeByteArrayInputStream
+ */
+ public UnsafeByteArrayInputStream getUnsafeByteArrayInput() {
+ return new UnsafeByteArrayInputStream(data);
+ }
+
@Override
void readFieldsRequest(DataInput input) throws IOException {
int dataLength = input.readInt();
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
index 8f168a2..de54188 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
@@ -18,13 +18,13 @@
package org.apache.giraph.comm.requests;
-import java.io.DataInput;
import java.io.IOException;
import org.apache.giraph.comm.GlobalCommType;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
import org.apache.giraph.utils.WritableUtils;
@@ -59,7 +59,7 @@
UnsafeByteArrayOutputStream reusedOut = new UnsafeByteArrayOutputStream();
UnsafeReusableByteArrayInput reusedIn = new UnsafeReusableByteArrayInput();
- DataInput input = getDataInput();
+ UnsafeByteArrayInputStream input = getUnsafeByteArrayInput();
AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
try {
int num = input.readInt();
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
index 361bdc9..ee7ac72 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
@@ -18,12 +18,12 @@
package org.apache.giraph.comm.requests;
-import java.io.DataInput;
import java.io.IOException;
import org.apache.giraph.comm.GlobalCommType;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
@@ -53,7 +53,7 @@
@Override
public void doRequest(ServerData serverData) {
- DataInput input = getDataInput();
+ UnsafeByteArrayInputStream input = getUnsafeByteArrayInput();
AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
try {
int num = input.readInt();
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
index 3a1bd64..0ea737f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java
@@ -47,7 +47,8 @@
@Override
public void doRequest(MasterGlobalCommHandler commHandler) {
try {
- commHandler.getAggregatorHandler().acceptReducedValues(getDataInput());
+ commHandler.getAggregatorHandler().
+ acceptReducedValues(getUnsafeByteArrayInput());
} catch (IOException e) {
throw new IllegalStateException("doRequest: " +
"IOException occurred while processing request", e);
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
index 2f76e6e..7164cb2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
@@ -18,12 +18,12 @@
package org.apache.giraph.comm.requests;
-import java.io.DataInput;
import java.io.IOException;
import org.apache.giraph.comm.GlobalCommType;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
@@ -52,7 +52,7 @@
@Override
public void doRequest(ServerData serverData) {
- DataInput input = getDataInput();
+ UnsafeByteArrayInputStream input = getUnsafeByteArrayInput();
OwnerAggregatorServerData aggregatorData =
serverData.getOwnerAggregatorData();
try {
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
index c5587e1..2b91502 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
@@ -54,9 +54,6 @@
private static final long BYTE_ARRAY_OFFSET =
UNSAFE.arrayBaseOffset(byte[].class);
- /** Byte buffer */
- protected byte[] buf;
-
/**
* Constructor
*
@@ -64,7 +61,7 @@
*/
public UnsafeArrayReads(byte[] buf) {
super(buf.length);
- this.buf = buf;
+ this.buffer = buf;
}
/**
@@ -76,12 +73,12 @@
*/
public UnsafeArrayReads(byte[] buf, int offset, int length) {
super(offset, length);
- this.buf = buf;
+ this.buffer = buf;
}
@Override
public int available() {
- return (int) (bufLength - pos);
+ return (int) (limit - position);
}
@Override
@@ -92,38 +89,38 @@
@Override
public int getPos() {
- return (int) pos;
+ return (int) position;
}
@Override
public void readFully(byte[] b) throws IOException {
- ensureRemaining(b.length);
- System.arraycopy(buf, (int) pos, b, 0, b.length);
- pos += b.length;
+ require(b.length);
+ System.arraycopy(buffer, (int) position, b, 0, b.length);
+ position += b.length;
}
@Override
public void readFully(byte[] b, int off, int len) throws IOException {
- ensureRemaining(len);
- System.arraycopy(buf, (int) pos, b, off, len);
- pos += len;
+ require(len);
+ System.arraycopy(buffer, (int) position, b, off, len);
+ position += len;
}
@Override
- public boolean readBoolean() throws IOException {
- ensureRemaining(SIZE_OF_BOOLEAN);
- boolean value = UNSAFE.getBoolean(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += SIZE_OF_BOOLEAN;
+ public boolean readBoolean() {
+ require(SIZE_OF_BOOLEAN);
+ boolean value = UNSAFE.getBoolean(buffer,
+ BYTE_ARRAY_OFFSET + position);
+ position += SIZE_OF_BOOLEAN;
return value;
}
@Override
- public byte readByte() throws IOException {
- ensureRemaining(SIZE_OF_BYTE);
- byte value = UNSAFE.getByte(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += SIZE_OF_BYTE;
+ public byte readByte() {
+ require(SIZE_OF_BYTE);
+ byte value = UNSAFE.getByte(buffer,
+ BYTE_ARRAY_OFFSET + position);
+ position += SIZE_OF_BYTE;
return value;
}
@@ -133,11 +130,11 @@
}
@Override
- public short readShort() throws IOException {
- ensureRemaining(SIZE_OF_SHORT);
- short value = UNSAFE.getShort(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += SIZE_OF_SHORT;
+ public short readShort() {
+ require(SIZE_OF_SHORT);
+ short value = UNSAFE.getShort(buffer,
+ BYTE_ARRAY_OFFSET + position);
+ position += SIZE_OF_SHORT;
return value;
}
@@ -147,47 +144,47 @@
}
@Override
- public char readChar() throws IOException {
- ensureRemaining(SIZE_OF_CHAR);
- char value = UNSAFE.getChar(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += SIZE_OF_CHAR;
+ public char readChar() {
+ require(SIZE_OF_CHAR);
+ char value = UNSAFE.getChar(buffer,
+ BYTE_ARRAY_OFFSET + position);
+ position += SIZE_OF_CHAR;
return value;
}
@Override
- public int readInt() throws IOException {
- ensureRemaining(SIZE_OF_INT);
- int value = UNSAFE.getInt(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += SIZE_OF_INT;
+ public int readInt() {
+ require(SIZE_OF_INT);
+ int value = UNSAFE.getInt(buffer,
+ BYTE_ARRAY_OFFSET + position);
+ position += SIZE_OF_INT;
return value;
}
@Override
- public long readLong() throws IOException {
- ensureRemaining(SIZE_OF_LONG);
- long value = UNSAFE.getLong(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += SIZE_OF_LONG;
+ public long readLong() {
+ require(SIZE_OF_LONG);
+ long value = UNSAFE.getLong(buffer,
+ BYTE_ARRAY_OFFSET + position);
+ position += SIZE_OF_LONG;
return value;
}
@Override
- public float readFloat() throws IOException {
- ensureRemaining(SIZE_OF_FLOAT);
- float value = UNSAFE.getFloat(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += SIZE_OF_FLOAT;
+ public float readFloat() {
+ require(SIZE_OF_FLOAT);
+ float value = UNSAFE.getFloat(buffer,
+ BYTE_ARRAY_OFFSET + position);
+ position += SIZE_OF_FLOAT;
return value;
}
@Override
- public double readDouble() throws IOException {
- ensureRemaining(SIZE_OF_DOUBLE);
- double value = UNSAFE.getDouble(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += SIZE_OF_DOUBLE;
+ public double readDouble() {
+ require(SIZE_OF_DOUBLE);
+ double value = UNSAFE.getDouble(buffer,
+ BYTE_ARRAY_OFFSET + position);
+ position += SIZE_OF_DOUBLE;
return value;
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
index c8a8cac..b9b1995 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
@@ -20,6 +20,13 @@
/**
* UnsafeByteArrayInputStream
+ *
+ * This stream now extends com.esotericsoftware.kryo.io.Input so that kryo
+ * serialization can directly read from this stream without using an
+ * additional buffer, providing a faster serialization.
+
+ * Users of this class has to explicitly close the stream to avoid style check
+ * errors even though close is no-op when the underlying stream is not set.
*/
public class UnsafeByteArrayInputStream extends UnsafeArrayReads {
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
index 27f1156..13bc7d2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
@@ -17,8 +17,9 @@
*/
package org.apache.giraph.utils;
+import com.esotericsoftware.kryo.io.Output;
+
import java.io.IOException;
-import java.io.OutputStream;
import java.io.UTFDataFormatException;
import java.lang.reflect.Field;
import java.util.Arrays;
@@ -34,9 +35,16 @@
/**
* Byte array output stream that uses Unsafe methods to serialize/deserialize
- * much faster
+ * much faster.
+ *
+ * This stream now extends com.esotericsoftware.kryo.io.Output so that kryo
+ * serialization can directly write to this stream without using an
+ * additional buffer, providing a faster serialization.
+ *
+ * Users of this class has to explicitly close the stream to avoid style check
+ * errors even though close is no-op when the underlying stream is not set.
*/
-public class UnsafeByteArrayOutputStream extends OutputStream
+public class UnsafeByteArrayOutputStream extends Output
implements ExtendedDataOutput {
static {
try {
@@ -61,11 +69,6 @@
private static final long BYTE_ARRAY_OFFSET =
UNSAFE.arrayBaseOffset(byte[].class);
- /** Byte buffer */
- private byte[] buf;
- /** Position in the buffer */
- private int pos = 0;
-
/**
* Constructor
*/
@@ -79,7 +82,8 @@
* @param size Initial size of the underlying byte array
*/
public UnsafeByteArrayOutputStream(int size) {
- buf = new byte[size];
+ buffer = new byte[size];
+ capacity = size;
}
/**
@@ -89,10 +93,11 @@
*/
public UnsafeByteArrayOutputStream(byte[] buf) {
if (buf == null) {
- this.buf = new byte[DEFAULT_BYTES];
+ this.buffer = new byte[DEFAULT_BYTES];
} else {
- this.buf = buf;
+ this.buffer = buf;
}
+ capacity = this.buffer.length;
}
/**
@@ -103,7 +108,7 @@
*/
public UnsafeByteArrayOutputStream(byte[] buf, int pos) {
this(buf);
- this.pos = pos;
+ this.position = pos;
}
/**
@@ -112,148 +117,161 @@
*
* @param size Size to add
*/
- private void ensureSize(int size) {
- if (pos + size > buf.length) {
- byte[] newBuf = new byte[(buf.length + size) << 1];
- System.arraycopy(buf, 0, newBuf, 0, pos);
- buf = newBuf;
+ @Override
+ protected boolean require(int size) {
+ if (position + size > buffer.length) {
+ byte[] newBuf = new byte[(buffer.length + size) << 1];
+ System.arraycopy(buffer, 0, newBuf, 0, position);
+ buffer = newBuf;
+ capacity = buffer.length;
+ return true;
}
+ return false;
}
@Override
public byte[] getByteArray() {
- return buf;
+ return buffer;
}
@Override
public byte[] toByteArray() {
- return Arrays.copyOf(buf, pos);
+ return Arrays.copyOf(buffer, position);
}
@Override
public byte[] toByteArray(int offset, int length) {
- if (offset + length > pos) {
+ if (offset + length > position) {
throw new IndexOutOfBoundsException(String.format("Offset: %d + " +
- "Length: %d exceeds the size of buf : %d", offset, length, pos));
+ "Length: %d exceeds the size of buffer : %d",
+ offset, length, position));
}
- return Arrays.copyOfRange(buf, offset, length);
+ return Arrays.copyOfRange(buffer, offset, length);
}
@Override
public void reset() {
- pos = 0;
+ position = 0;
}
@Override
public int getPos() {
- return pos;
+ return position;
}
@Override
- public void write(int b) throws IOException {
- ensureSize(SIZE_OF_BYTE);
- buf[pos] = (byte) b;
- pos += SIZE_OF_BYTE;
+ public void write(int b) {
+ require(SIZE_OF_BYTE);
+ buffer[position] = (byte) b;
+ position += SIZE_OF_BYTE;
}
@Override
- public void write(byte[] b) throws IOException {
- ensureSize(b.length);
- System.arraycopy(b, 0, buf, pos, b.length);
- pos += b.length;
+ public void write(byte[] b) {
+ require(b.length);
+ System.arraycopy(b, 0, buffer, position, b.length);
+ position += b.length;
}
@Override
- public void write(byte[] b, int off, int len) throws IOException {
- ensureSize(len);
- System.arraycopy(b, off, buf, pos, len);
- pos += len;
+ public void write(byte[] b, int off, int len) {
+ require(len);
+ System.arraycopy(b, off, buffer, position, len);
+ position += len;
}
@Override
- public void writeBoolean(boolean v) throws IOException {
- ensureSize(SIZE_OF_BOOLEAN);
- UNSAFE.putBoolean(buf, BYTE_ARRAY_OFFSET + pos, v);
- pos += SIZE_OF_BOOLEAN;
+ public void writeBoolean(boolean v) {
+ require(SIZE_OF_BOOLEAN);
+ UNSAFE.putBoolean(buffer, BYTE_ARRAY_OFFSET + position, v);
+ position += SIZE_OF_BOOLEAN;
}
@Override
- public void writeByte(int v) throws IOException {
- ensureSize(SIZE_OF_BYTE);
- UNSAFE.putByte(buf, BYTE_ARRAY_OFFSET + pos, (byte) v);
- pos += SIZE_OF_BYTE;
+ public void writeByte(int v) {
+ require(SIZE_OF_BYTE);
+ UNSAFE.putByte(buffer, BYTE_ARRAY_OFFSET + position, (byte) v);
+ position += SIZE_OF_BYTE;
}
@Override
- public void writeShort(int v) throws IOException {
- ensureSize(SIZE_OF_SHORT);
- UNSAFE.putShort(buf, BYTE_ARRAY_OFFSET + pos, (short) v);
- pos += SIZE_OF_SHORT;
+ public void writeShort(int v) {
+ require(SIZE_OF_SHORT);
+ UNSAFE.putShort(buffer, BYTE_ARRAY_OFFSET + position, (short) v);
+ position += SIZE_OF_SHORT;
}
@Override
public void writeChar(int v) throws IOException {
- ensureSize(SIZE_OF_CHAR);
- UNSAFE.putChar(buf, BYTE_ARRAY_OFFSET + pos, (char) v);
- pos += SIZE_OF_CHAR;
+ require(SIZE_OF_CHAR);
+ UNSAFE.putChar(buffer, BYTE_ARRAY_OFFSET + position, (char) v);
+ position += SIZE_OF_CHAR;
}
@Override
- public void writeInt(int v) throws IOException {
- ensureSize(SIZE_OF_INT);
- UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, v);
- pos += SIZE_OF_INT;
+ public void writeChar(char v) {
+ require(SIZE_OF_CHAR);
+ UNSAFE.putChar(buffer, BYTE_ARRAY_OFFSET + position, v);
+ position += SIZE_OF_CHAR;
+ }
+
+ @Override
+ public void writeInt(int v) {
+ require(SIZE_OF_INT);
+ UNSAFE.putInt(buffer, BYTE_ARRAY_OFFSET + position, v);
+ position += SIZE_OF_INT;
}
@Override
public void ensureWritable(int minSize) {
- if ((pos + minSize) > buf.length) {
- buf = Arrays.copyOf(buf, Math.max(buf.length << 1, pos + minSize));
+ if ((position + minSize) > buffer.length) {
+ buffer = Arrays.copyOf(buffer,
+ Math.max(buffer.length << 1, position + minSize));
}
}
@Override
public void skipBytes(int bytesToSkip) {
ensureWritable(bytesToSkip);
- pos += bytesToSkip;
+ position += bytesToSkip;
}
@Override
public void writeInt(int pos, int value) {
- if (pos + SIZE_OF_INT > this.pos) {
+ if (pos + SIZE_OF_INT > this.position) {
throw new IndexOutOfBoundsException(
"writeInt: Tried to write int to position " + pos +
- " but current length is " + this.pos);
+ " but current length is " + this.position);
}
- UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, value);
+ UNSAFE.putInt(buffer, BYTE_ARRAY_OFFSET + pos, value);
}
@Override
- public void writeLong(long v) throws IOException {
- ensureSize(SIZE_OF_LONG);
- UNSAFE.putLong(buf, BYTE_ARRAY_OFFSET + pos, v);
- pos += SIZE_OF_LONG;
+ public void writeLong(long v) {
+ require(SIZE_OF_LONG);
+ UNSAFE.putLong(buffer, BYTE_ARRAY_OFFSET + position, v);
+ position += SIZE_OF_LONG;
}
@Override
- public void writeFloat(float v) throws IOException {
- ensureSize(SIZE_OF_FLOAT);
- UNSAFE.putFloat(buf, BYTE_ARRAY_OFFSET + pos, v);
- pos += SIZE_OF_FLOAT;
+ public void writeFloat(float v) {
+ require(SIZE_OF_FLOAT);
+ UNSAFE.putFloat(buffer, BYTE_ARRAY_OFFSET + position, v);
+ position += SIZE_OF_FLOAT;
}
@Override
- public void writeDouble(double v) throws IOException {
- ensureSize(SIZE_OF_DOUBLE);
- UNSAFE.putDouble(buf, BYTE_ARRAY_OFFSET + pos, v);
- pos += SIZE_OF_DOUBLE;
+ public void writeDouble(double v) {
+ require(SIZE_OF_DOUBLE);
+ UNSAFE.putDouble(buffer, BYTE_ARRAY_OFFSET + position, v);
+ position += SIZE_OF_DOUBLE;
}
@Override
public void writeBytes(String s) throws IOException {
// Note that this code is mostly copied from DataOutputStream
int len = s.length();
- ensureSize(len);
+ require(len);
for (int i = 0; i < len; i++) {
int v = s.charAt(i);
writeByte(v);
@@ -264,7 +282,7 @@
public void writeChars(String s) throws IOException {
// Note that this code is mostly copied from DataOutputStream
int len = s.length();
- ensureSize(len * SIZE_OF_CHAR);
+ require(len * SIZE_OF_CHAR);
for (int i = 0; i < len; i++) {
int v = s.charAt(i);
writeChar(v);
@@ -295,7 +313,7 @@
"encoded string too long: " + utflen + " bytes");
}
- ensureSize(utflen + SIZE_OF_SHORT);
+ require(utflen + SIZE_OF_SHORT);
writeShort(utflen);
int i = 0;
@@ -304,21 +322,21 @@
if (!((c >= 0x0001) && (c <= 0x007F))) {
break;
}
- buf[pos++] = (byte) c;
+ buffer[position++] = (byte) c;
}
for (; i < strlen; i++) {
c = s.charAt(i);
if ((c >= 0x0001) && (c <= 0x007F)) {
- buf[pos++] = (byte) c;
+ buffer[position++] = (byte) c;
} else if (c > 0x07FF) {
- buf[pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
- buf[pos++] = (byte) (0x80 | ((c >> 6) & 0x3F));
- buf[pos++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+ buffer[position++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+ buffer[position++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+ buffer[position++] = (byte) (0x80 | ((c >> 0) & 0x3F));
} else {
- buf[pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
- buf[pos++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+ buffer[position++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+ buffer[position++] = (byte) (0x80 | ((c >> 0) & 0x3F));
}
}
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
index 39ab352..4053ca6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
@@ -18,6 +18,8 @@
package org.apache.giraph.utils;
+import com.esotericsoftware.kryo.io.Input;
+
import java.io.IOException;
import java.io.UTFDataFormatException;
@@ -25,12 +27,7 @@
* Byte array input stream that uses Unsafe methods to deserialize
* much faster
*/
-public abstract class UnsafeReads implements ExtendedDataInput {
-
- /** Buffer length */
- protected int bufLength;
- /** Position in the buffer */
- protected long pos = 0;
+public abstract class UnsafeReads extends Input implements ExtendedDataInput {
/**
* Constructor
@@ -38,7 +35,7 @@
* @param length buf length
*/
public UnsafeReads(int length) {
- bufLength = length;
+ limit = length;
}
/**
@@ -48,8 +45,8 @@
* @param length buf length
*/
public UnsafeReads(long offset, int length) {
- pos = offset;
- bufLength = length;
+ position = (int) offset;
+ limit = length;
}
/**
@@ -72,17 +69,19 @@
* @param requiredBytes Bytes required to read
* @throws IOException When there are not enough bytes to read
*/
- protected void ensureRemaining(int requiredBytes) throws IOException {
+ @Override
+ protected int require(int requiredBytes) {
if (available() < requiredBytes) {
- throw new IOException("ensureRemaining: Only " + available() +
- " bytes remaining, trying to read " + requiredBytes);
+ throw new IndexOutOfBoundsException("require: Only " +
+ available() + " bytes remaining, trying to read " + requiredBytes);
}
+ return available();
}
@Override
- public int skipBytes(int n) throws IOException {
- ensureRemaining(n);
- pos += n;
+ public int skipBytes(int n) {
+ require(n);
+ position += n;
return n;
}
@@ -105,7 +104,7 @@
case '\r':
int c2 = readByte();
if ((c2 != '\n') && (c2 != -1)) {
- pos -= 1;
+ position -= 1;
}
break loop;
default:
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReusableByteArrayInput.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReusableByteArrayInput.java
index a75815a..679119f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReusableByteArrayInput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReusableByteArrayInput.java
@@ -39,8 +39,8 @@
* @param length length of the valid data
*/
public void initialize(byte[] buf, int offset, int length) {
- this.buf = buf;
- this.pos = offset;
- this.bufLength = length;
+ this.buffer = buf;
+ this.position = offset;
+ this.limit = length;
}
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java
index c8251b1..d0a68f3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java
@@ -18,9 +18,9 @@
package org.apache.giraph.utils.io;
-import org.apache.giraph.utils.ExtendedByteArrayDataInput;
import org.apache.giraph.utils.ExtendedDataInput;
import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
@@ -36,7 +36,7 @@
public class BigDataInput implements ExtendedDataInput {
/** Empty data input */
private static final ExtendedDataInput EMPTY_INPUT =
- new ExtendedByteArrayDataInput(new byte[0]);
+ new UnsafeByteArrayInputStream(new byte[0]);
/** Input which we are currently reading from */
private ExtendedDataInput currentInput;