closes #148
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/BigDataByteArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/BigDataByteArrayEdges.java
index bca93eb..b34020b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/BigDataByteArrayEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/BigDataByteArrayEdges.java
@@ -38,269 +38,368 @@
import java.util.Optional;
/**
- * {@link OutEdges} implementation backed by a list of {@link ExtendedDataOutput}.
- * Parallel edges are allowed.
- * This implementation is similar to {@link ByteArrayEdges} except it is not limited by size of underlying array
- * and will allow adding new edges until machine runs out of memory
- * Note: this implementation is optimized for space usage,
- * but edge removals are expensive.
+ * {@link OutEdges} implementation backed by a list of
+ * {@link ExtendedDataOutput}. Parallel edges are allowed.
+ * <p>
+ * This implementation is similar to {@link ByteArrayEdges} except it is not
+ * limited by size of underlying array and will allow adding new edges until
+ * machine runs out of memory.
+ * <p>
+ * Note: this implementation is optimized for space usage, but edge removals
+ * are expensive.
*
* @param <I> Vertex id
* @param <E> Edge value
*/
-public class BigDataByteArrayEdges <I extends WritableComparable, E extends Writable>
- extends ConfigurableOutEdges<I, E>
- implements ReuseObjectsOutEdges<I, E>, Trimmable
-{
- private static final int DEFAULT_MAX_BUFFER_SIZE = Integer.MAX_VALUE;
- /** maximum buffer size. Sometimes it will be exceeded by the last edge, if last edge is bigger in size than anything seen before **/
- private final int maxBufferSize;
- /** expected initial buffer capacity. Use implementation default is empty **/
- private Optional<Integer> capacity = Optional.empty();
- /** Current serialized edges. */
- private ExtendedDataOutput currentDataOutput = null;
- /** All serialized edges. */
- private List<ExtendedDataOutput> dataOutputs;
- /** Number of edges. */
- private int edgeCount = 0;
- /** Temporary buffer used for one edge serialization */
- private ExtendedDataOutput tempDataOutput;
- @VisibleForTesting
- BigDataByteArrayEdges(int maxBufferSize) {
- this.maxBufferSize = maxBufferSize;
- }
+public class BigDataByteArrayEdges<I extends WritableComparable,
+ E extends Writable> extends ConfigurableOutEdges<I, E>
+ implements ReuseObjectsOutEdges<I, E>, Trimmable {
+ /**
+ * Default max buffer size.z
+ */
+ private static final int DEFAULT_MAX_BUFFER_SIZE = Integer.MAX_VALUE;
+ /**
+ * Maximum buffer size. Sometimes it will be exceeded by the last edge,
+ * if last edge is bigger in size than anything seen before
+ */
+ private final int maxBufferSize;
+ /**
+ * Expected initial buffer capacity. Use implementation default is empty
+ */
+ private Optional<Integer> capacity = Optional.empty();
+ /**
+ * Current serialized edges.
+ */
+ private ExtendedDataOutput currentDataOutput = null;
+ /**
+ * All serialized edges.
+ */
+ private List<ExtendedDataOutput> dataOutputs;
+ /**
+ * Number of edges.
+ */
+ private int edgeCount = 0;
+ /**
+ * Temporary buffer used for one edge serialization
+ */
+ private ExtendedDataOutput tempDataOutput;
- public BigDataByteArrayEdges()
- {
- this(DEFAULT_MAX_BUFFER_SIZE);
- }
+ /**
+ * Constructor that accepts max buffer size.
+ * @param maxBufferSize Max buffer size
+ */
+ @VisibleForTesting
+ BigDataByteArrayEdges(int maxBufferSize) {
+ this.maxBufferSize = maxBufferSize;
+ }
- @Override
- public void initialize(Iterable<Edge<I, E>> edges) {
- reset(Optional.empty());
- for (Edge<I, E> edge : edges) {
- add(edge);
- }
- }
+ /**
+ * Default constructor.
+ */
+ public BigDataByteArrayEdges() {
+ this(DEFAULT_MAX_BUFFER_SIZE);
+ }
- @Override
- public void initialize(int capacity) {
- reset(Optional.of(Math.min(capacity, maxBufferSize)));
+ /**
+ * Initialize from set of edges.
+ * @param edges Iterable of edges
+ */
+ @Override
+ public void initialize(Iterable<Edge<I, E>> edges) {
+ reset(Optional.empty());
+ for (Edge<I, E> edge : edges) {
+ add(edge);
}
+ }
- @Override
- public void initialize() {
- this.reset(this.capacity); //don't reset capacity
- }
+ /**
+ * Initialize by passing expected capacity.
+ * @param capacity Initial capacity
+ */
+ @Override
+ public void initialize(int capacity) {
+ reset(Optional.of(Math.min(capacity, maxBufferSize)));
+ }
- private void reset(Optional<Integer> capacity) {
- this.capacity = capacity;
- this.edgeCount = 0;
- this.currentDataOutput = null;
- this.dataOutputs = new ArrayList<>();
- this.tempDataOutput = getConf().createExtendedDataOutput();
+ /**
+ * Default initialization.
+ */
+ @Override
+ public void initialize() {
+ this.reset(this.capacity); //don't reset capacity
+ }
+
+ /**
+ * Reset capacity.
+ * @param capacity Capacity
+ */
+ private void reset(Optional<Integer> capacity) {
+ this.capacity = capacity;
+ this.edgeCount = 0;
+ this.currentDataOutput = null;
+ this.dataOutputs = new ArrayList<>();
+ this.tempDataOutput = getConf().createExtendedDataOutput();
+ createNextDataOutput();
+ }
+
+ /**
+ * Return whether maximum buffer size of current data output be exceeded
+ * if byteCount bytes are added to it
+ *
+ * @param byteCount Byte count to be added
+ * @return true if max size will be exceeded, false otherwise
+ */
+ private boolean isCurrentDataOutputFull(int byteCount) {
+ return currentDataOutput.getPos() + byteCount > maxBufferSize;
+ }
+
+ /**
+ * Create and return ExtendedDataOutput with specified capacity settings
+ * @return {@link ExtendedDataOutput} instance
+ */
+ private ExtendedDataOutput createExtendedDataOutput() {
+ return capacity.isPresent() ?
+ getConf().createExtendedDataOutput(capacity.get()) :
+ getConf().createExtendedDataOutput();
+ }
+
+ /**
+ * Create new current data output
+ */
+ private void createNextDataOutput() {
+ currentDataOutput = createExtendedDataOutput();
+ dataOutputs.add(currentDataOutput);
+ }
+
+ /**
+ * Add edge.
+ * @param edge Edge to add
+ */
+ @Override
+ public void add(Edge<I, E> edge) {
+ try {
+ tempDataOutput.reset();
+ WritableUtils.writeEdge(tempDataOutput, edge);
+ int edgeByteSize = tempDataOutput.getPos();
+ if (isCurrentDataOutputFull(edgeByteSize)) {
createNextDataOutput();
- }
+ }
+ currentDataOutput.write(tempDataOutput.getByteArray(), 0, edgeByteSize);
+ edgeCount++;
- /**
- * Return whether maximum buffer size of current data output be exceeded
- * if byteCount bytes are added to it
- */
- private boolean isCurrentDataOutputFull(int byteCount) {
- return currentDataOutput.getPos() + byteCount > maxBufferSize;
+ } catch (IOException e) {
+ throw new IllegalStateException("add: Failed to write to the new " +
+ "byte array");
}
+ }
- /**
- * Create and return ExtendedDataOutput with specified capacity settings
- */
- private ExtendedDataOutput createExtendedDataOutput() {
- return capacity.isPresent() ?
- getConf().createExtendedDataOutput(capacity.get()) :
- getConf().createExtendedDataOutput();
- }
-
- /**
- * Create new current data output
- */
- private void createNextDataOutput() {
- currentDataOutput = createExtendedDataOutput();
- dataOutputs.add(currentDataOutput);
- }
-
- @Override
- public void add(Edge<I, E> edge) {
+ /**
+ * Removes edge based on target vertex ID.
+ * @param targetVertexId Target vertex id
+ */
+ @Override
+ public void remove(I targetVertexId) {
+ // Note that this is very expensive (deserializes all edges).
+ List<ExtendedDataOutput> outputsToRemove = new LinkedList<>();
+ ReusableEdge<I, E> representativeEdge =
+ getConf().createReusableEdge();
+ for (int outputIdx = 0; outputIdx < dataOutputs.size(); outputIdx++) {
+ ExtendedDataOutput output = dataOutputs.get(outputIdx);
+ ExtendedDataInput input =
+ getConf().createExtendedDataInput(output.toByteArray());
+ ExtendedDataOutput updatedOutput = createExtendedDataOutput();
+ int remainingCount = 0;
+ boolean edgesRemoved = false;
+ while (!input.endOfInput()) {
try {
- tempDataOutput.reset();
- WritableUtils.writeEdge(tempDataOutput, edge);
- int edgeByteSize = tempDataOutput.getPos();
- if (isCurrentDataOutputFull(edgeByteSize)) {
- createNextDataOutput();
- }
- currentDataOutput.write(tempDataOutput.getByteArray(), 0, edgeByteSize);
- edgeCount++;
+ WritableUtils.readEdge(input, representativeEdge);
+ if (representativeEdge.getTargetVertexId().equals(targetVertexId)) {
+ edgesRemoved = true;
+ edgeCount--;
+ } else {
+ remainingCount++;
+ WritableUtils.writeEdge(updatedOutput, representativeEdge);
+ }
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+ if (edgesRemoved) {
+ if (remainingCount == 0) {
+ outputsToRemove.add(output);
+ } else {
+ dataOutputs.set(outputIdx, updatedOutput);
+ }
+ }
- }
- catch (IOException e) {
- throw new IllegalStateException("add: Failed to write to the new " +
- "byte array");
- }
}
- @Override
- public void remove(I targetVertexId) {
- // Note that this is very expensive (deserializes all edges).
- List<ExtendedDataOutput> outputsToRemove = new LinkedList<>();
- ReusableEdge<I, E> representativeEdge =
- getConf().createReusableEdge();
- for (int outputIdx = 0; outputIdx < dataOutputs.size(); outputIdx++) {
- ExtendedDataOutput output = dataOutputs.get(outputIdx);
- ExtendedDataInput input = getConf().createExtendedDataInput(output.toByteArray());
- ExtendedDataOutput updatedOutput = createExtendedDataOutput();
- int remainingCount = 0;
- boolean edgesRemoved = false;
- while(!input.endOfInput()) {
- try {
- WritableUtils.readEdge(input, representativeEdge);
- if (representativeEdge.getTargetVertexId().equals(targetVertexId)) {
- edgesRemoved = true;
- edgeCount--;
- }
- else {
- remainingCount++;
- WritableUtils.writeEdge(updatedOutput, representativeEdge);
- }
- }
- catch (IOException ioe) {
- throw new RuntimeException(ioe);
- }
- }
- if (edgesRemoved) {
- if (remainingCount == 0) {
- outputsToRemove.add(output);
- }
- else {
- dataOutputs.set(outputIdx, updatedOutput);
- }
- }
-
- }
-
- if (!outputsToRemove.isEmpty()) {
- dataOutputs.removeAll(outputsToRemove);
- }
-
- if (dataOutputs.isEmpty()) {
- createNextDataOutput();
- }
- else {
- currentDataOutput = dataOutputs.get(dataOutputs.size() - 1);
- }
+ if (!outputsToRemove.isEmpty()) {
+ dataOutputs.removeAll(outputsToRemove);
}
- @Override
- public int size() {
- return edgeCount;
+ if (dataOutputs.isEmpty()) {
+ createNextDataOutput();
+ } else {
+ currentDataOutput = dataOutputs.get(dataOutputs.size() - 1);
+ }
+ }
+
+ /**
+ * Returns number of edges
+ * @return number of edges.
+ */
+ @Override
+ public int size() {
+ return edgeCount;
+ }
+
+ /**
+ * Returns iterator of edges.
+ * @return Edge iterator.
+ */
+ @Override
+ public Iterator<Edge<I, E>> iterator() {
+ return new BigDataByteArrayEdgeIterator(dataOutputs.iterator());
+ }
+
+ /**
+ * Compact data output at given data output index
+ * @param dataOutputIdx Data output index
+ */
+ private void trim(int dataOutputIdx) {
+ if (dataOutputIdx >= dataOutputs.size()) {
+ return;
+ }
+ ExtendedDataOutput output = dataOutputs.get(dataOutputIdx);
+ if (output.getPos() < output.getByteArray().length) {
+ byte[] trimmedBytes = output.toByteArray();
+ output =
+ getConf().createExtendedDataOutput(trimmedBytes, trimmedBytes.length);
+ dataOutputs.set(dataOutputIdx, output);
+ if (dataOutputIdx == dataOutputs.size() - 1) {
+ currentDataOutput = output;
+ }
+ }
+ }
+
+ /**
+ * Compact all data output buffers by replacing every data output to buffer
+ * whose size exceeds current position with new buffer whose size fits the
+ * data and copying data into it
+ */
+ @Override
+ public void trim() {
+ if (currentDataOutput == null) {
+ return;
}
- @Override
- public Iterator<Edge<I, E>> iterator() {
- return new BigDataByteArrayEdgeIterator(dataOutputs.iterator());
+ for (int dataOutputIdx = 0; dataOutputIdx < dataOutputs.size();
+ dataOutputIdx++) {
+ trim(dataOutputIdx);
+ }
+ }
+
+ /**
+ * Serialization logic.
+ * @param out {@link DataOutput} instance
+ * @throws IOException
+ */
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(dataOutputs.size());
+ for (ExtendedDataOutput dataOutput : dataOutputs) {
+ out.writeInt(dataOutput.getPos());
+ out.write(dataOutput.getByteArray(), 0, dataOutput.getPos());
+ }
+ out.writeInt(edgeCount);
+ }
+
+ /**
+ * Deserialization logic.
+ * @param in {@link DataInput} instance
+ * @throws IOException
+ */
+ @Override
+ public void readFields(DataInput in)
+ throws IOException {
+ initialize();
+ int dataOutputsCount = in.readInt();
+ for (int dataOutputIdx = 0; dataOutputIdx < dataOutputsCount;
+ dataOutputIdx++) {
+ byte[] bytes = new byte[in.readInt()];
+ in.readFully(bytes, 0, bytes.length);
+ currentDataOutput.write(bytes);
+ }
+ edgeCount = in.readInt();
+ }
+
+ /**
+ * Implements edge iterator.
+ */
+ private class BigDataByteArrayEdgeIterator
+ extends UnmodifiableIterator<Edge<I, E>> {
+ /**
+ * Iterator of ExtendedDataOutput instances to merge.
+ */
+ private final Iterator<ExtendedDataOutput> byteArrayEdgesListIterator;
+ /**
+ * Current data input instance.
+ */
+ private ExtendedDataInput currentInput = null;
+ /**
+ * Representative edge object.
+ */
+ private ReusableEdge<I, E> representativeEdge =
+ getConf().createReusableEdge();
+
+ /**
+ * Constructs iterator by passing multiple iterators to be merged.
+ * @param byteArrayEdgesListIterator List of iterators
+ */
+ private BigDataByteArrayEdgeIterator(
+ Iterator<ExtendedDataOutput> byteArrayEdgesListIterator) {
+ this.byteArrayEdgesListIterator = byteArrayEdgesListIterator;
}
/**
- * Compact data output at given data output index
+ * Checks if there is more data in the iterators.
+ * @return True if there is more data, false otherwise.
*/
- private void trim(int dataOutputIdx) {
- if (dataOutputIdx >= dataOutputs.size()) {
- return;
- }
- ExtendedDataOutput output = dataOutputs.get(dataOutputIdx);
- if (output.getPos() < output.getByteArray().length) {
- byte [] trimmedBytes = output.toByteArray();
- output = getConf().createExtendedDataOutput(trimmedBytes, trimmedBytes.length);
- dataOutputs.set(dataOutputIdx, output);
- if (dataOutputIdx == dataOutputs.size() - 1) {
- currentDataOutput = output;
- }
- }
+ private boolean inputHasNext() {
+ return currentInput != null && !currentInput.endOfInput();
}
/**
- * Compact all data output buffers
- * by replacing every data output to buffer
- * whose size exceeds current position with new buffer whose size fits the data
- * and copying data into it
+ * Checks if the iterator has more edges.
+ * @return True if iterator has more edges, false otherwise.
*/
@Override
- public void trim() {
- if (currentDataOutput == null) {
- return;
- }
-
- for (int dataOutputIdx = 0; dataOutputIdx < dataOutputs.size(); dataOutputIdx++) {
- trim(dataOutputIdx);
- }
+ public boolean hasNext() {
+ while (!inputHasNext() && byteArrayEdgesListIterator.hasNext()) {
+ ExtendedDataOutput currentOutput = byteArrayEdgesListIterator.next();
+ currentInput =
+ getConf().createExtendedDataInput(currentOutput.toByteArray());
+ }
+ return inputHasNext();
}
+ /**
+ * Advances the iterator.
+ * @return Next edge object.
+ */
@Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(dataOutputs.size());
- for (ExtendedDataOutput dataOutput : dataOutputs) {
- out.writeInt(dataOutput.getPos());
- out.write(dataOutput.getByteArray(), 0, dataOutput.getPos());
- }
- out.writeInt(edgeCount);
+ public Edge<I, E> next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ try {
+ WritableUtils.readEdge(currentInput, representativeEdge);
+ } catch (IOException e) {
+ throw new IllegalStateException("next: Failed on pos " +
+ currentInput.getPos() + " edge " + representativeEdge);
+ }
+ return representativeEdge;
}
-
- @Override
- public void readFields(DataInput in)
- throws IOException {
- initialize();
- int dataOutputsCount = in.readInt();
- for (int dataOutputIdx = 0; dataOutputIdx < dataOutputsCount; dataOutputIdx++) {
- byte [] bytes = new byte[in.readInt()];
- in.readFully(bytes, 0, bytes.length);
- currentDataOutput.write(bytes);
- }
- edgeCount = in.readInt();
- }
-
- private class BigDataByteArrayEdgeIterator
- extends UnmodifiableIterator<Edge<I, E>> {
- private final Iterator<ExtendedDataOutput> byteArrayEdgesListIterator;
- ExtendedDataInput currentInput = null;
- /** Representative edge object. */
- private ReusableEdge<I, E> representativeEdge =
- getConf().createReusableEdge();
- private BigDataByteArrayEdgeIterator(Iterator<ExtendedDataOutput> byteArrayEdgesListIterator) {
- this.byteArrayEdgesListIterator = byteArrayEdgesListIterator;
- }
- private boolean inputHasNext() {
- return currentInput != null && !currentInput.endOfInput();
- }
- @Override
- public boolean hasNext()
- {
- while (!inputHasNext() && byteArrayEdgesListIterator.hasNext()) {
- ExtendedDataOutput currentOutput = byteArrayEdgesListIterator.next();
- currentInput = getConf().createExtendedDataInput(currentOutput.toByteArray());
- }
- return inputHasNext();
- }
-
- @Override
- public Edge<I, E> next()
- {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- try {
- WritableUtils.readEdge(currentInput, representativeEdge);
- } catch (IOException e) {
- throw new IllegalStateException("next: Failed on pos " +
- currentInput.getPos() + " edge " + representativeEdge);
- }
- return representativeEdge;
- }
- }
+ }
}
diff --git a/giraph-core/src/test/java/org/apache/giraph/edge/BigDataByteArrayEdgesTest.java b/giraph-core/src/test/java/org/apache/giraph/edge/BigDataByteArrayEdgesTest.java
index fe1ce40..e12f9b1 100644
--- a/giraph-core/src/test/java/org/apache/giraph/edge/BigDataByteArrayEdgesTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/edge/BigDataByteArrayEdgesTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.giraph.edge;
import junit.framework.TestCase;