GIRAPH-1250
closes #147
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/BigDataByteArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/BigDataByteArrayEdges.java
new file mode 100644
index 0000000..bca93eb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/BigDataByteArrayEdges.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.Trimmable;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+
+/**
+ * {@link OutEdges} implementation backed by a list of {@link ExtendedDataOutput}.
+ * Parallel edges are allowed.
+ * This implementation is similar to {@link ByteArrayEdges} except it is not limited by size of underlying array
+ * and will allow adding new edges until machine runs out of memory
+ * Note: this implementation is optimized for space usage,
+ * but edge removals are expensive.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class BigDataByteArrayEdges <I extends WritableComparable, E extends Writable>
+ extends ConfigurableOutEdges<I, E>
+ implements ReuseObjectsOutEdges<I, E>, Trimmable
+{
+ private static final int DEFAULT_MAX_BUFFER_SIZE = Integer.MAX_VALUE;
+ /** maximum buffer size. Sometimes it will be exceeded by the last edge, if last edge is bigger in size than anything seen before **/
+ private final int maxBufferSize;
+ /** expected initial buffer capacity. Use implementation default is empty **/
+ private Optional<Integer> capacity = Optional.empty();
+ /** Current serialized edges. */
+ private ExtendedDataOutput currentDataOutput = null;
+ /** All serialized edges. */
+ private List<ExtendedDataOutput> dataOutputs;
+ /** Number of edges. */
+ private int edgeCount = 0;
+ /** Temporary buffer used for one edge serialization */
+ private ExtendedDataOutput tempDataOutput;
+ @VisibleForTesting
+ BigDataByteArrayEdges(int maxBufferSize) {
+ this.maxBufferSize = maxBufferSize;
+ }
+
+ public BigDataByteArrayEdges()
+ {
+ this(DEFAULT_MAX_BUFFER_SIZE);
+ }
+
+ @Override
+ public void initialize(Iterable<Edge<I, E>> edges) {
+ reset(Optional.empty());
+ for (Edge<I, E> edge : edges) {
+ add(edge);
+ }
+ }
+
+ @Override
+ public void initialize(int capacity) {
+ reset(Optional.of(Math.min(capacity, maxBufferSize)));
+ }
+
+ @Override
+ public void initialize() {
+ this.reset(this.capacity); //don't reset capacity
+ }
+
+ private void reset(Optional<Integer> capacity) {
+ this.capacity = capacity;
+ this.edgeCount = 0;
+ this.currentDataOutput = null;
+ this.dataOutputs = new ArrayList<>();
+ this.tempDataOutput = getConf().createExtendedDataOutput();
+ createNextDataOutput();
+ }
+
+ /**
+ * Return whether maximum buffer size of current data output be exceeded
+ * if byteCount bytes are added to it
+ */
+ private boolean isCurrentDataOutputFull(int byteCount) {
+ return currentDataOutput.getPos() + byteCount > maxBufferSize;
+ }
+
+ /**
+ * Create and return ExtendedDataOutput with specified capacity settings
+ */
+ private ExtendedDataOutput createExtendedDataOutput() {
+ return capacity.isPresent() ?
+ getConf().createExtendedDataOutput(capacity.get()) :
+ getConf().createExtendedDataOutput();
+ }
+
+ /**
+ * Create new current data output
+ */
+ private void createNextDataOutput() {
+ currentDataOutput = createExtendedDataOutput();
+ dataOutputs.add(currentDataOutput);
+ }
+
+ @Override
+ public void add(Edge<I, E> edge) {
+ try {
+ tempDataOutput.reset();
+ WritableUtils.writeEdge(tempDataOutput, edge);
+ int edgeByteSize = tempDataOutput.getPos();
+ if (isCurrentDataOutputFull(edgeByteSize)) {
+ createNextDataOutput();
+ }
+ currentDataOutput.write(tempDataOutput.getByteArray(), 0, edgeByteSize);
+ edgeCount++;
+
+ }
+ catch (IOException e) {
+ throw new IllegalStateException("add: Failed to write to the new " +
+ "byte array");
+ }
+ }
+
+ @Override
+ public void remove(I targetVertexId) {
+ // Note that this is very expensive (deserializes all edges).
+ List<ExtendedDataOutput> outputsToRemove = new LinkedList<>();
+ ReusableEdge<I, E> representativeEdge =
+ getConf().createReusableEdge();
+ for (int outputIdx = 0; outputIdx < dataOutputs.size(); outputIdx++) {
+ ExtendedDataOutput output = dataOutputs.get(outputIdx);
+ ExtendedDataInput input = getConf().createExtendedDataInput(output.toByteArray());
+ ExtendedDataOutput updatedOutput = createExtendedDataOutput();
+ int remainingCount = 0;
+ boolean edgesRemoved = false;
+ while(!input.endOfInput()) {
+ try {
+ WritableUtils.readEdge(input, representativeEdge);
+ if (representativeEdge.getTargetVertexId().equals(targetVertexId)) {
+ edgesRemoved = true;
+ edgeCount--;
+ }
+ else {
+ remainingCount++;
+ WritableUtils.writeEdge(updatedOutput, representativeEdge);
+ }
+ }
+ catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+ if (edgesRemoved) {
+ if (remainingCount == 0) {
+ outputsToRemove.add(output);
+ }
+ else {
+ dataOutputs.set(outputIdx, updatedOutput);
+ }
+ }
+
+ }
+
+ if (!outputsToRemove.isEmpty()) {
+ dataOutputs.removeAll(outputsToRemove);
+ }
+
+ if (dataOutputs.isEmpty()) {
+ createNextDataOutput();
+ }
+ else {
+ currentDataOutput = dataOutputs.get(dataOutputs.size() - 1);
+ }
+ }
+
+ @Override
+ public int size() {
+ return edgeCount;
+ }
+
+ @Override
+ public Iterator<Edge<I, E>> iterator() {
+ return new BigDataByteArrayEdgeIterator(dataOutputs.iterator());
+ }
+
+ /**
+ * Compact data output at given data output index
+ */
+ private void trim(int dataOutputIdx) {
+ if (dataOutputIdx >= dataOutputs.size()) {
+ return;
+ }
+ ExtendedDataOutput output = dataOutputs.get(dataOutputIdx);
+ if (output.getPos() < output.getByteArray().length) {
+ byte [] trimmedBytes = output.toByteArray();
+ output = getConf().createExtendedDataOutput(trimmedBytes, trimmedBytes.length);
+ dataOutputs.set(dataOutputIdx, output);
+ if (dataOutputIdx == dataOutputs.size() - 1) {
+ currentDataOutput = output;
+ }
+ }
+ }
+
+ /**
+ * Compact all data output buffers
+ * by replacing every data output to buffer
+ * whose size exceeds current position with new buffer whose size fits the data
+ * and copying data into it
+ */
+ @Override
+ public void trim() {
+ if (currentDataOutput == null) {
+ return;
+ }
+
+ for (int dataOutputIdx = 0; dataOutputIdx < dataOutputs.size(); dataOutputIdx++) {
+ trim(dataOutputIdx);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(dataOutputs.size());
+ for (ExtendedDataOutput dataOutput : dataOutputs) {
+ out.writeInt(dataOutput.getPos());
+ out.write(dataOutput.getByteArray(), 0, dataOutput.getPos());
+ }
+ out.writeInt(edgeCount);
+ }
+
+ @Override
+ public void readFields(DataInput in)
+ throws IOException {
+ initialize();
+ int dataOutputsCount = in.readInt();
+ for (int dataOutputIdx = 0; dataOutputIdx < dataOutputsCount; dataOutputIdx++) {
+ byte [] bytes = new byte[in.readInt()];
+ in.readFully(bytes, 0, bytes.length);
+ currentDataOutput.write(bytes);
+ }
+ edgeCount = in.readInt();
+ }
+
+ private class BigDataByteArrayEdgeIterator
+ extends UnmodifiableIterator<Edge<I, E>> {
+ private final Iterator<ExtendedDataOutput> byteArrayEdgesListIterator;
+ ExtendedDataInput currentInput = null;
+ /** Representative edge object. */
+ private ReusableEdge<I, E> representativeEdge =
+ getConf().createReusableEdge();
+ private BigDataByteArrayEdgeIterator(Iterator<ExtendedDataOutput> byteArrayEdgesListIterator) {
+ this.byteArrayEdgesListIterator = byteArrayEdgesListIterator;
+ }
+ private boolean inputHasNext() {
+ return currentInput != null && !currentInput.endOfInput();
+ }
+ @Override
+ public boolean hasNext()
+ {
+ while (!inputHasNext() && byteArrayEdgesListIterator.hasNext()) {
+ ExtendedDataOutput currentOutput = byteArrayEdgesListIterator.next();
+ currentInput = getConf().createExtendedDataInput(currentOutput.toByteArray());
+ }
+ return inputHasNext();
+ }
+
+ @Override
+ public Edge<I, E> next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ try {
+ WritableUtils.readEdge(currentInput, representativeEdge);
+ } catch (IOException e) {
+ throw new IllegalStateException("next: Failed on pos " +
+ currentInput.getPos() + " edge " + representativeEdge);
+ }
+ return representativeEdge;
+ }
+ }
+}
diff --git a/giraph-core/src/test/java/org/apache/giraph/edge/BigDataByteArrayEdgesTest.java b/giraph-core/src/test/java/org/apache/giraph/edge/BigDataByteArrayEdgesTest.java
new file mode 100644
index 0000000..fe1ce40
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/edge/BigDataByteArrayEdgesTest.java
@@ -0,0 +1,321 @@
+package org.apache.giraph.edge;
+
+import junit.framework.TestCase;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class BigDataByteArrayEdgesTest
+ extends TestCase
+{
+ private static ImmutableClassesGiraphConfiguration createConfiguration() {
+ return createConfiguration(LongWritable.class);
+ }
+ private static <T extends Writable> ImmutableClassesGiraphConfiguration createConfiguration(Class<T> clazz) {
+ GiraphConfiguration conf = new GiraphConfiguration();
+ GiraphConstants.VERTEX_ID_CLASS.set(conf, LongWritable.class);
+ GiraphConstants.EDGE_VALUE_CLASS.set(conf, clazz);
+ return new ImmutableClassesGiraphConfiguration<>(conf);
+ }
+
+ private static BigDataByteArrayEdges<LongWritable, LongWritable> createBigDataByteArrayEdges(int maxBufferSize) {
+ BigDataByteArrayEdges<LongWritable, LongWritable> edges = new BigDataByteArrayEdges<>(maxBufferSize);
+ edges.setConf(createConfiguration());
+ return edges;
+ }
+
+ private static List<Edge<LongWritable, LongWritable>> createEdges(Edge<LongWritable, LongWritable> ...edges) {
+ return new ArrayList<>(Arrays.asList(edges));
+ }
+
+ private static ExtendedDataOutput createDataOutput(List<Edge<LongWritable, LongWritable>> edges) throws IOException {
+ final int NUM_FLUFF_INTS = 8;
+ ExtendedDataOutput dataOutput = createConfiguration().createExtendedDataOutput();
+ dataOutput.writeInt(1); // 1 data output in big data set
+ dataOutput.writeInt(0); // byte count (to be filled later after edges have been written)
+
+ edges.forEach(e -> {
+ try {
+ WritableUtils.writeEdge(dataOutput, e);
+ }
+ catch (IOException exc) {
+
+ }
+ });
+ dataOutput.writeInt(Integer.BYTES, dataOutput.getPos() - 2*Integer.BYTES); //update byte count with actual number
+ dataOutput.writeInt(edges.size());
+ for (int i = 0; i < NUM_FLUFF_INTS; i++) {
+ dataOutput.write(i); //some fluff at the end - make sure BigDataByteArrayEdges can handle it
+ }
+ return dataOutput;
+ }
+ private static ExtendedDataInput createDataInput(ExtendedDataOutput output) {
+ return new ImmutableClassesGiraphConfiguration<>(createConfiguration())
+ .createExtendedDataInput(output.toByteArray());
+ }
+ private static ExtendedDataInput createDataInput(List<Edge<LongWritable, LongWritable>> edges) throws IOException {
+ return createDataInput(createDataOutput(edges));
+ }
+
+ private static Edge<LongWritable, LongWritable> edge(long vertex, long value) {
+ DefaultEdge edge = new DefaultEdge<>();
+ edge.setTargetVertexId(new LongWritable(vertex));
+ edge.setValue(new LongWritable(value));
+ return edge;
+ }
+
+ private static Edge<LongWritable, Text> edge(long vertex, String value) {
+ DefaultEdge edge = new DefaultEdge<>();
+ edge.setTargetVertexId(new LongWritable(vertex));
+ edge.setValue(new Text(value));
+ return edge;
+ }
+
+ private static <T extends Writable> void assertEdges(List<Edge<LongWritable, T>> expectedList,
+ BigDataByteArrayEdges<LongWritable, T> edges) {
+ assertEquals(expectedList.size(), edges.size());
+ Iterator<Edge<LongWritable, T>> expected = expectedList.iterator();
+ Iterator<Edge<LongWritable, T>> actual = edges.iterator();
+
+ while (actual.hasNext() && expected.hasNext()) {
+ Edge<LongWritable, T> expectedValue = expected.next();
+ Edge<LongWritable, T> actualValue = actual.next();
+ assertEquals(expectedValue.getTargetVertexId(), actualValue.getTargetVertexId());
+ assertEquals(expectedValue.getValue(), actualValue.getValue());
+ }
+ assertTrue("Not equal size", !actual.hasNext() && !expected.hasNext());
+ }
+
+ //region test initialization
+ @Test
+ public void testInitializeEmpty()
+ {
+ BigDataByteArrayEdges<LongWritable, LongWritable> edges =
+ createBigDataByteArrayEdges(8);
+ edges.initialize();
+ assertEdges(Arrays.asList(), edges);
+ }
+
+ @Test
+ public void testInitializeInitialCapacity()
+ {
+ BigDataByteArrayEdges<LongWritable, LongWritable> edges =
+ createBigDataByteArrayEdges(64);
+ edges.initialize(32);
+ assertEdges(Arrays.asList(), edges);
+ }
+
+ @Test
+ public void testInitializeWithEdges()
+ {
+ BigDataByteArrayEdges<LongWritable, LongWritable> edges =
+ createBigDataByteArrayEdges(10);
+ List<Edge<LongWritable, LongWritable>> initEdges =
+ createEdges(edge(1, 4), edge(2, 5), edge(3, 6));
+ edges.initialize(initEdges);
+ assertEdges(initEdges, edges);
+ }
+
+ @Test
+ public void testInitializeMultipleTimes()
+ {
+ BigDataByteArrayEdges<LongWritable, LongWritable> edges =
+ createBigDataByteArrayEdges(10);
+ List<Edge<LongWritable, LongWritable>> initEdges =
+ createEdges(edge(1, 4), edge(2, 5), edge(3, 6));
+ edges.initialize(initEdges);
+ assertEdges(initEdges, edges);
+ edges.initialize();
+ assertEdges(Arrays.asList(), edges); //empty after reinitialization
+ initEdges.forEach(e -> edges.add(e));
+ assertEdges(initEdges, edges);
+ }
+ //endregion
+
+ //region add/remove edges
+ @Test
+ public void testAddEdgeThatExceedsMaxBufferSize()
+ {
+ BigDataByteArrayEdges<LongWritable, LongWritable> edges =
+ createBigDataByteArrayEdges(3);
+ edges.initialize(3);
+ List<Edge<LongWritable, LongWritable>> inputEdges = createEdges(edge(1, 3), edge(2, 4));
+ inputEdges.forEach(e -> edges.add(e));
+ assertEdges(inputEdges, edges);
+ }
+
+ @Test
+ public void testAddEdgesTwoPerBuffer()
+ {
+ BigDataByteArrayEdges<LongWritable, LongWritable> edges =
+ //each edge is 16 bytes
+ createBigDataByteArrayEdges(32);
+ edges.initialize();
+ List<Edge<LongWritable, LongWritable>> inputEdges = new ArrayList<>();
+ int vertex;
+ for (vertex = 0; vertex < 49; vertex++) { // make sure last data output is not full
+ inputEdges.add(edge(vertex, vertex % 3));
+ }
+ inputEdges.forEach(e -> edges.add(e));
+ assertEdges(inputEdges, edges);
+ edges.trim(); //verify that when last data output is trimmed, current is update correctly
+ assertEdges(inputEdges, edges);
+ inputEdges.add(edge(vertex, vertex % 3));
+ edges.add(inputEdges.get(inputEdges.size() - 1));
+ assertEdges(inputEdges, edges);
+ }
+
+ @Test
+ public void testEdgesOfDifferentSize()
+ {
+ BigDataByteArrayEdges<LongWritable, Text> edges = new BigDataByteArrayEdges<>(32);
+ edges.setConf(createConfiguration(Text.class));
+ edges.initialize();
+ List<Edge<LongWritable, Text>> inputEdges = new ArrayList<>();
+ inputEdges.add(edge(1, "Short String"));
+ inputEdges.add(edge(1, "Very Very Long, Super Long String"));
+ inputEdges.forEach(e -> edges.add(e));
+ assertEdges(inputEdges, edges);
+ }
+
+ @Test
+ public void testEdgesOfDefaultMaxSize()
+ {
+ BigDataByteArrayEdges<LongWritable, Text> edges = new BigDataByteArrayEdges<>();
+ edges.setConf(createConfiguration(Text.class));
+ edges.initialize();
+ List<Edge<LongWritable, Text>> inputEdges = new ArrayList<>();
+ for (long vertex = 0; vertex < 1000; vertex++) {
+ inputEdges.add(edge(vertex, String.format("VertexValue%d", vertex % 3)));
+ }
+ inputEdges.forEach(e -> edges.add(e));
+ assertEdges(inputEdges, edges);
+ }
+
+ @Test
+ public void testAddEdgesTwoPerBufferWithUnusedBytes()
+ {
+ //each edge is 16 bytes, 8 bytes remaining at the end of each buffer will be trimmed
+ BigDataByteArrayEdges<LongWritable, LongWritable> edges =
+ createBigDataByteArrayEdges(40);
+ edges.initialize(40);
+ List<Edge<LongWritable, LongWritable>> inputEdges = new ArrayList<>();
+ for (long vertex = 0; vertex < 50; vertex++) {
+ inputEdges.add(edge(vertex, vertex % 3));
+ }
+ inputEdges.forEach(e -> edges.add(e));
+ assertEdges(inputEdges, edges);
+ edges.trim();
+ assertEdges(inputEdges, edges);
+ }
+
+ @Test
+ public void testRemoveEdgesIncludesCurrentBuffer()
+ {
+ //each edge is 16 bytes, 8 bytes remaining at the end of each buffer will be trimmed
+ BigDataByteArrayEdges<LongWritable, LongWritable> edges =
+ createBigDataByteArrayEdges(40);
+ edges.initialize(40);
+ List<Edge<LongWritable, LongWritable>> inputEdges =
+ createEdges(edge(1, 1002), edge(2, 1003), // remove(2) - remove 1 edge, remove(1) - remove 1 edge
+ edge(3, 1004), edge(4, 1005), // remove(2) - 0 edges from buffer 2
+ edge(2, 1006), edge(2, 1007));// remove(2) - all edges from buffer 3,
+ // current buffer should revert to previous one
+
+ List<Edge<LongWritable, LongWritable>> expectedEdges = inputEdges.stream().filter(e -> e.getTargetVertexId().get() != 2)
+ .collect(Collectors.toList());
+ inputEdges.forEach(e -> edges.add(e));
+ assertEdges(inputEdges, edges);
+ edges.remove(new LongWritable(2));
+ assertEdges(expectedEdges, edges);
+
+ inputEdges.add(edge(6, 1008));
+ expectedEdges.add(inputEdges.get(inputEdges.size() - 1));
+ edges.add(inputEdges.get(inputEdges.size() - 1)); //confirm that add works after removal
+
+ assertEdges(expectedEdges, edges);
+
+ edges.remove(new LongWritable(5)); //remove item that doesn't exist
+ assertEdges(expectedEdges, edges);
+ edges.remove(new LongWritable(1));
+ expectedEdges = expectedEdges.stream().filter(e -> e.getTargetVertexId().get() != 1)
+ .collect(Collectors.toList());
+ assertEdges(expectedEdges, edges);
+ inputEdges.add(edge(7, 1009));
+ expectedEdges.add(inputEdges.get(inputEdges.size() - 1));
+ edges.add(inputEdges.get(inputEdges.size() - 1)); //confirm that add works after removal
+ assertEdges(expectedEdges, edges);
+ inputEdges.forEach(e -> edges.remove(e.getTargetVertexId())); // remove everything
+ assertEdges(Arrays.asList(), edges); //confirm that edges is empty
+
+ //verify that after removing all elements edges is in a state that add works correctly
+ inputEdges.forEach(e -> edges.add(e));
+ assertEdges(inputEdges, edges);
+ }
+ //endregion
+
+ //region read/write
+ @Test
+ public void testReadWriteExtendedDataInputOutputBuffers() throws IOException
+ {
+ // input is smaller than max, readFields will not deserialize edges
+ BigDataByteArrayEdges<LongWritable, LongWritable> edges =
+ createBigDataByteArrayEdges(40);
+ edges.initialize(40);
+ List<Edge<LongWritable, LongWritable>> inputEdges = createEdges(edge(1, 2), edge(3, 4));
+ ExtendedDataInput dataInput = createDataInput(inputEdges);
+ edges.readFields(dataInput);
+ assertEdges(inputEdges, edges);
+ ExtendedDataOutput output = createConfiguration().createExtendedDataOutput();
+ edges.write(output);
+
+ // input is larger than max, readFields will deserialize edges and split them across multiple buffers
+ edges = createBigDataByteArrayEdges(16);
+ edges.initialize(16);
+ edges.readFields(createDataInput(output));
+ assertEdges(inputEdges, edges);
+
+ //input is larger than max, readFields will deserialize edges and split them across multiple buffers.
+ //make sure fluff at the end is ignored
+ dataInput = createDataInput(inputEdges);
+ edges.readFields(dataInput);
+ assertEdges(inputEdges, edges);
+ }
+
+ @Test
+ public void testReadWriteBigDataInputOutputBuffers() throws IOException
+ {
+ // input is smaller than max, readFields will not deserialize edges
+ BigDataByteArrayEdges<LongWritable, LongWritable> edges =
+ createBigDataByteArrayEdges(40);
+ edges.initialize(40);
+ List<Edge<LongWritable, LongWritable>> inputEdges = createEdges(edge(1, 2), edge(3, 4));
+ DataInput dataInput = createDataInput(inputEdges);
+ edges.readFields(dataInput);
+ assertEdges(inputEdges, edges);
+ ExtendedDataOutput output = createConfiguration().createExtendedDataOutput();
+ edges.write(output);
+
+ // input is larger than max, readFields will deserialize edges and split them across multiple buffers
+ edges = createBigDataByteArrayEdges(16);
+ edges.initialize(16);
+ edges.readFields(createDataInput(output));
+ assertEdges(inputEdges, edges);
+ }
+ //endregion
+}