closes #147
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/ b/giraph-core/src/main/java/org/apache/giraph/edge/
new file mode 100644
index 0000000..bca93eb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/
@@ -0,0 +1,306 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.edge;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.Trimmable;
+import org.apache.giraph.utils.WritableUtils;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+ * {@link OutEdges} implementation backed by a list of {@link ExtendedDataOutput}.
+ * Parallel edges are allowed.
+ * This implementation is similar to {@link ByteArrayEdges} except it is not limited by size of underlying array
+ * and will allow adding new edges until machine runs out of memory
+ * Note: this implementation is optimized for space usage,
+ * but edge removals are expensive.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class BigDataByteArrayEdges <I extends WritableComparable, E extends Writable>
+        extends ConfigurableOutEdges<I, E>
+        implements ReuseObjectsOutEdges<I, E>, Trimmable
+    private static final int DEFAULT_MAX_BUFFER_SIZE = Integer.MAX_VALUE;
+    /** maximum buffer size. Sometimes it will be exceeded by the last edge, if last edge is bigger in size than anything seen before **/
+    private final int maxBufferSize;
+    /** expected initial buffer capacity. Use implementation default is empty **/
+    private Optional<Integer> capacity = Optional.empty();
+    /** Current serialized edges. */
+    private ExtendedDataOutput currentDataOutput = null;
+    /** All serialized edges. */
+    private List<ExtendedDataOutput> dataOutputs;
+    /** Number of edges. */
+    private int edgeCount = 0;
+    /** Temporary buffer used for one edge serialization */
+    private ExtendedDataOutput tempDataOutput;
+    @VisibleForTesting
+    BigDataByteArrayEdges(int maxBufferSize) {
+        this.maxBufferSize = maxBufferSize;
+    }
+    public BigDataByteArrayEdges()
+    {
+    }
+    @Override
+    public void initialize(Iterable<Edge<I, E>> edges) {
+        reset(Optional.empty());
+        for (Edge<I, E> edge : edges) {
+            add(edge);
+        }
+    }
+    @Override
+    public void initialize(int capacity) {
+        reset(Optional.of(Math.min(capacity, maxBufferSize)));
+    }
+    @Override
+    public void initialize() {
+        this.reset(this.capacity); //don't reset capacity
+    }
+    private void reset(Optional<Integer> capacity) {
+        this.capacity = capacity;
+        this.edgeCount = 0;
+        this.currentDataOutput = null;
+        this.dataOutputs = new ArrayList<>();
+        this.tempDataOutput = getConf().createExtendedDataOutput();
+        createNextDataOutput();
+    }
+    /**
+     * Return whether maximum buffer size of current data output be exceeded
+     * if byteCount bytes are added to it
+     */
+    private boolean isCurrentDataOutputFull(int byteCount) {
+        return currentDataOutput.getPos() + byteCount > maxBufferSize;
+    }
+    /**
+     * Create and return ExtendedDataOutput with specified capacity settings
+     */
+    private ExtendedDataOutput createExtendedDataOutput() {
+        return capacity.isPresent() ?
+                getConf().createExtendedDataOutput(capacity.get()) :
+                getConf().createExtendedDataOutput();
+    }
+    /**
+     * Create new current data output
+     */
+    private void createNextDataOutput() {
+        currentDataOutput = createExtendedDataOutput();
+        dataOutputs.add(currentDataOutput);
+    }
+    @Override
+    public void add(Edge<I, E> edge) {
+        try {
+            tempDataOutput.reset();
+            WritableUtils.writeEdge(tempDataOutput, edge);
+            int edgeByteSize = tempDataOutput.getPos();
+            if (isCurrentDataOutputFull(edgeByteSize)) {
+                createNextDataOutput();
+            }
+            currentDataOutput.write(tempDataOutput.getByteArray(), 0, edgeByteSize);
+            edgeCount++;
+        }
+        catch (IOException e) {
+            throw new IllegalStateException("add: Failed to write to the new " +
+                    "byte array");
+        }
+    }
+    @Override
+    public void remove(I targetVertexId) {
+        // Note that this is very expensive (deserializes all edges).
+        List<ExtendedDataOutput> outputsToRemove = new LinkedList<>();
+        ReusableEdge<I, E> representativeEdge =
+                getConf().createReusableEdge();
+        for (int outputIdx = 0; outputIdx < dataOutputs.size(); outputIdx++) {
+            ExtendedDataOutput output = dataOutputs.get(outputIdx);
+            ExtendedDataInput input = getConf().createExtendedDataInput(output.toByteArray());
+            ExtendedDataOutput updatedOutput = createExtendedDataOutput();
+            int remainingCount = 0;
+            boolean edgesRemoved = false;
+            while(!input.endOfInput()) {
+                try {
+                    WritableUtils.readEdge(input, representativeEdge);
+                    if (representativeEdge.getTargetVertexId().equals(targetVertexId)) {
+                        edgesRemoved = true;
+                        edgeCount--;
+                    }
+                    else {
+                        remainingCount++;
+                        WritableUtils.writeEdge(updatedOutput, representativeEdge);
+                    }
+                }
+                catch (IOException ioe) {
+                    throw new RuntimeException(ioe);
+                }
+            }
+            if (edgesRemoved) {
+                if (remainingCount == 0) {
+                    outputsToRemove.add(output);
+                }
+                else {
+                    dataOutputs.set(outputIdx, updatedOutput);
+                }
+            }
+        }
+        if (!outputsToRemove.isEmpty()) {
+            dataOutputs.removeAll(outputsToRemove);
+        }
+        if (dataOutputs.isEmpty()) {
+            createNextDataOutput();
+        }
+        else {
+            currentDataOutput = dataOutputs.get(dataOutputs.size() - 1);
+        }
+    }
+    @Override
+    public int size() {
+        return edgeCount;
+    }
+    @Override
+    public Iterator<Edge<I, E>> iterator() {
+        return new BigDataByteArrayEdgeIterator(dataOutputs.iterator());
+    }
+    /**
+     *  Compact data output at given data output index
+     */
+    private void trim(int dataOutputIdx) {
+        if (dataOutputIdx >= dataOutputs.size()) {
+            return;
+        }
+        ExtendedDataOutput output = dataOutputs.get(dataOutputIdx);
+        if (output.getPos() < output.getByteArray().length) {
+            byte [] trimmedBytes = output.toByteArray();
+            output = getConf().createExtendedDataOutput(trimmedBytes, trimmedBytes.length);
+            dataOutputs.set(dataOutputIdx, output);
+            if (dataOutputIdx == dataOutputs.size() - 1) {
+                currentDataOutput = output;
+            }
+        }
+    }
+    /**
+     * Compact all data output buffers
+     * by replacing every data output to buffer
+     * whose size exceeds current position with new buffer whose size fits the data
+     * and copying data into it
+     */
+    @Override
+    public void trim() {
+        if (currentDataOutput == null) {
+            return;
+        }
+        for (int dataOutputIdx = 0; dataOutputIdx < dataOutputs.size(); dataOutputIdx++) {
+            trim(dataOutputIdx);
+        }
+    }
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(dataOutputs.size());
+        for (ExtendedDataOutput dataOutput : dataOutputs) {
+            out.writeInt(dataOutput.getPos());
+            out.write(dataOutput.getByteArray(), 0, dataOutput.getPos());
+        }
+        out.writeInt(edgeCount);
+    }
+    @Override
+    public void readFields(DataInput in)
+            throws IOException {
+        initialize();
+        int dataOutputsCount = in.readInt();
+        for (int dataOutputIdx = 0; dataOutputIdx < dataOutputsCount; dataOutputIdx++) {
+            byte [] bytes = new byte[in.readInt()];
+            in.readFully(bytes, 0, bytes.length);
+            currentDataOutput.write(bytes);
+        }
+        edgeCount = in.readInt();
+    }
+    private class BigDataByteArrayEdgeIterator
+            extends UnmodifiableIterator<Edge<I, E>> {
+        private final Iterator<ExtendedDataOutput> byteArrayEdgesListIterator;
+        ExtendedDataInput currentInput = null;
+        /** Representative edge object. */
+        private ReusableEdge<I, E> representativeEdge =
+                getConf().createReusableEdge();
+        private BigDataByteArrayEdgeIterator(Iterator<ExtendedDataOutput> byteArrayEdgesListIterator) {
+            this.byteArrayEdgesListIterator = byteArrayEdgesListIterator;
+        }
+        private boolean inputHasNext() {
+            return currentInput != null && !currentInput.endOfInput();
+        }
+        @Override
+        public boolean hasNext()
+        {
+            while (!inputHasNext() && byteArrayEdgesListIterator.hasNext()) {
+                ExtendedDataOutput currentOutput =;
+                currentInput = getConf().createExtendedDataInput(currentOutput.toByteArray());
+            }
+            return inputHasNext();
+        }
+        @Override
+        public Edge<I, E> next()
+        {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            try {
+                WritableUtils.readEdge(currentInput, representativeEdge);
+            } catch (IOException e) {
+                throw new IllegalStateException("next: Failed on pos " +
+                        currentInput.getPos() + " edge " + representativeEdge);
+            }
+            return representativeEdge;
+        }
+    }
diff --git a/giraph-core/src/test/java/org/apache/giraph/edge/ b/giraph-core/src/test/java/org/apache/giraph/edge/
new file mode 100644
index 0000000..fe1ce40
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/edge/
@@ -0,0 +1,321 @@
+package org.apache.giraph.edge;
+import junit.framework.TestCase;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.WritableUtils;
+import org.junit.Test;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+public class BigDataByteArrayEdgesTest
+        extends TestCase
+    private static ImmutableClassesGiraphConfiguration createConfiguration() {
+        return createConfiguration(LongWritable.class);
+    }
+    private static <T extends Writable> ImmutableClassesGiraphConfiguration createConfiguration(Class<T> clazz) {
+        GiraphConfiguration conf = new GiraphConfiguration();
+        GiraphConstants.VERTEX_ID_CLASS.set(conf, LongWritable.class);
+        GiraphConstants.EDGE_VALUE_CLASS.set(conf, clazz);
+        return new ImmutableClassesGiraphConfiguration<>(conf);
+    }
+    private static BigDataByteArrayEdges<LongWritable, LongWritable> createBigDataByteArrayEdges(int maxBufferSize) {
+        BigDataByteArrayEdges<LongWritable, LongWritable> edges = new BigDataByteArrayEdges<>(maxBufferSize);
+        edges.setConf(createConfiguration());
+        return edges;
+    }
+    private static List<Edge<LongWritable, LongWritable>> createEdges(Edge<LongWritable, LongWritable> ...edges) {
+        return new ArrayList<>(Arrays.asList(edges));
+    }
+    private static ExtendedDataOutput createDataOutput(List<Edge<LongWritable, LongWritable>> edges) throws IOException {
+        final int NUM_FLUFF_INTS = 8;
+        ExtendedDataOutput dataOutput = createConfiguration().createExtendedDataOutput();
+        dataOutput.writeInt(1); // 1 data output in big data set
+        dataOutput.writeInt(0); // byte count (to be filled later after edges have been written)
+        edges.forEach(e ->  {
+            try {
+                WritableUtils.writeEdge(dataOutput, e);
+            }
+            catch (IOException exc) {
+            }
+        });
+        dataOutput.writeInt(Integer.BYTES, dataOutput.getPos() - 2*Integer.BYTES); //update byte count with actual number
+        dataOutput.writeInt(edges.size());
+        for (int i = 0; i < NUM_FLUFF_INTS; i++) {
+            dataOutput.write(i); //some fluff at the end - make sure BigDataByteArrayEdges can handle it
+        }
+        return dataOutput;
+    }
+    private static ExtendedDataInput createDataInput(ExtendedDataOutput output) {
+        return new ImmutableClassesGiraphConfiguration<>(createConfiguration())
+                .createExtendedDataInput(output.toByteArray());
+    }
+    private static ExtendedDataInput createDataInput(List<Edge<LongWritable, LongWritable>> edges) throws IOException {
+        return createDataInput(createDataOutput(edges));
+    }
+    private static Edge<LongWritable, LongWritable> edge(long vertex, long value) {
+        DefaultEdge edge = new DefaultEdge<>();
+        edge.setTargetVertexId(new LongWritable(vertex));
+        edge.setValue(new LongWritable(value));
+        return edge;
+    }
+    private static Edge<LongWritable, Text> edge(long vertex, String value) {
+        DefaultEdge edge = new DefaultEdge<>();
+        edge.setTargetVertexId(new LongWritable(vertex));
+        edge.setValue(new Text(value));
+        return edge;
+    }
+    private static <T extends Writable> void assertEdges(List<Edge<LongWritable, T>> expectedList,
+            BigDataByteArrayEdges<LongWritable, T> edges) {
+        assertEquals(expectedList.size(), edges.size());
+        Iterator<Edge<LongWritable, T>> expected = expectedList.iterator();
+        Iterator<Edge<LongWritable, T>> actual = edges.iterator();
+        while (actual.hasNext() && expected.hasNext()) {
+            Edge<LongWritable, T> expectedValue =;
+            Edge<LongWritable, T> actualValue =;
+            assertEquals(expectedValue.getTargetVertexId(), actualValue.getTargetVertexId());
+            assertEquals(expectedValue.getValue(), actualValue.getValue());
+        }
+        assertTrue("Not equal size", !actual.hasNext() && !expected.hasNext());
+    }
+    //region test initialization
+    @Test
+    public void testInitializeEmpty()
+    {
+        BigDataByteArrayEdges<LongWritable, LongWritable> edges =
+                createBigDataByteArrayEdges(8);
+        edges.initialize();
+        assertEdges(Arrays.asList(), edges);
+    }
+    @Test
+    public void testInitializeInitialCapacity()
+    {
+        BigDataByteArrayEdges<LongWritable, LongWritable> edges =
+                createBigDataByteArrayEdges(64);
+        edges.initialize(32);
+        assertEdges(Arrays.asList(), edges);
+    }
+    @Test
+    public void testInitializeWithEdges()
+    {
+        BigDataByteArrayEdges<LongWritable, LongWritable> edges =
+                createBigDataByteArrayEdges(10);
+        List<Edge<LongWritable, LongWritable>> initEdges =
+                createEdges(edge(1, 4), edge(2, 5), edge(3, 6));
+        edges.initialize(initEdges);
+        assertEdges(initEdges, edges);
+    }
+    @Test
+    public void testInitializeMultipleTimes()
+    {
+        BigDataByteArrayEdges<LongWritable, LongWritable> edges =
+                createBigDataByteArrayEdges(10);
+        List<Edge<LongWritable, LongWritable>> initEdges =
+                createEdges(edge(1, 4), edge(2, 5), edge(3, 6));
+        edges.initialize(initEdges);
+        assertEdges(initEdges, edges);
+        edges.initialize();
+        assertEdges(Arrays.asList(), edges); //empty after reinitialization
+        initEdges.forEach(e -> edges.add(e));
+        assertEdges(initEdges, edges);
+    }
+    //endregion
+    //region add/remove edges
+    @Test
+    public void testAddEdgeThatExceedsMaxBufferSize()
+    {
+        BigDataByteArrayEdges<LongWritable, LongWritable> edges =
+                createBigDataByteArrayEdges(3);
+        edges.initialize(3);
+        List<Edge<LongWritable, LongWritable>> inputEdges = createEdges(edge(1, 3), edge(2, 4));
+        inputEdges.forEach(e -> edges.add(e));
+        assertEdges(inputEdges, edges);
+    }
+    @Test
+    public void testAddEdgesTwoPerBuffer()
+    {
+        BigDataByteArrayEdges<LongWritable, LongWritable> edges =
+                //each edge is 16 bytes
+                createBigDataByteArrayEdges(32);
+        edges.initialize();
+        List<Edge<LongWritable, LongWritable>> inputEdges = new ArrayList<>();
+        int vertex;
+        for (vertex = 0; vertex < 49; vertex++) { // make sure last data output is not full
+            inputEdges.add(edge(vertex, vertex % 3));
+        }
+        inputEdges.forEach(e -> edges.add(e));
+        assertEdges(inputEdges, edges);
+        edges.trim(); //verify that when last data output is trimmed, current is update correctly
+        assertEdges(inputEdges, edges);
+        inputEdges.add(edge(vertex, vertex % 3));
+        edges.add(inputEdges.get(inputEdges.size() - 1));
+        assertEdges(inputEdges, edges);
+    }
+    @Test
+    public void testEdgesOfDifferentSize()
+    {
+        BigDataByteArrayEdges<LongWritable, Text> edges = new BigDataByteArrayEdges<>(32);
+        edges.setConf(createConfiguration(Text.class));
+        edges.initialize();
+        List<Edge<LongWritable, Text>> inputEdges = new ArrayList<>();
+        inputEdges.add(edge(1, "Short String"));
+        inputEdges.add(edge(1, "Very Very Long, Super Long String"));
+        inputEdges.forEach(e -> edges.add(e));
+        assertEdges(inputEdges, edges);
+    }
+    @Test
+    public void testEdgesOfDefaultMaxSize()
+    {
+        BigDataByteArrayEdges<LongWritable, Text> edges = new BigDataByteArrayEdges<>();
+        edges.setConf(createConfiguration(Text.class));
+        edges.initialize();
+        List<Edge<LongWritable, Text>> inputEdges = new ArrayList<>();
+        for (long vertex = 0; vertex < 1000; vertex++) {
+            inputEdges.add(edge(vertex, String.format("VertexValue%d", vertex % 3)));
+        }
+        inputEdges.forEach(e -> edges.add(e));
+        assertEdges(inputEdges, edges);
+    }
+    @Test
+    public void testAddEdgesTwoPerBufferWithUnusedBytes()
+    {
+        //each edge is 16 bytes, 8 bytes remaining at the end of each buffer will be trimmed
+        BigDataByteArrayEdges<LongWritable, LongWritable> edges =
+                createBigDataByteArrayEdges(40);
+        edges.initialize(40);
+        List<Edge<LongWritable, LongWritable>> inputEdges = new ArrayList<>();
+        for (long vertex = 0; vertex < 50; vertex++) {
+            inputEdges.add(edge(vertex, vertex % 3));
+        }
+        inputEdges.forEach(e -> edges.add(e));
+        assertEdges(inputEdges, edges);
+        edges.trim();
+        assertEdges(inputEdges, edges);
+    }
+    @Test
+    public void testRemoveEdgesIncludesCurrentBuffer()
+    {
+        //each edge is 16 bytes, 8 bytes remaining at the end of each buffer will be trimmed
+        BigDataByteArrayEdges<LongWritable, LongWritable> edges =
+                createBigDataByteArrayEdges(40);
+        edges.initialize(40);
+        List<Edge<LongWritable, LongWritable>> inputEdges =
+                createEdges(edge(1, 1002), edge(2, 1003), // remove(2) - remove 1 edge, remove(1) - remove 1 edge
+                        edge(3, 1004), edge(4, 1005), // remove(2) -  0 edges from buffer 2
+                        edge(2, 1006), edge(2, 1007));// remove(2) -  all edges from buffer 3,
+        // current buffer should revert to previous one
+        List<Edge<LongWritable, LongWritable>> expectedEdges = -> e.getTargetVertexId().get() != 2)
+                .collect(Collectors.toList());
+        inputEdges.forEach(e -> edges.add(e));
+        assertEdges(inputEdges, edges);
+        edges.remove(new LongWritable(2));
+        assertEdges(expectedEdges, edges);
+        inputEdges.add(edge(6, 1008));
+        expectedEdges.add(inputEdges.get(inputEdges.size() - 1));
+        edges.add(inputEdges.get(inputEdges.size() - 1)); //confirm that add works after removal
+        assertEdges(expectedEdges, edges);
+        edges.remove(new LongWritable(5)); //remove item that doesn't exist
+        assertEdges(expectedEdges, edges);
+        edges.remove(new LongWritable(1));
+        expectedEdges = -> e.getTargetVertexId().get() != 1)
+                .collect(Collectors.toList());
+        assertEdges(expectedEdges, edges);
+        inputEdges.add(edge(7, 1009));
+        expectedEdges.add(inputEdges.get(inputEdges.size() - 1));
+        edges.add(inputEdges.get(inputEdges.size() - 1)); //confirm that add works after removal
+        assertEdges(expectedEdges, edges);
+        inputEdges.forEach(e -> edges.remove(e.getTargetVertexId())); // remove everything
+        assertEdges(Arrays.asList(), edges);  //confirm that edges is empty
+        //verify that after removing all elements edges is in a state that add works correctly
+        inputEdges.forEach(e -> edges.add(e));
+        assertEdges(inputEdges, edges);
+    }
+    //endregion
+    //region read/write
+    @Test
+    public void testReadWriteExtendedDataInputOutputBuffers() throws IOException
+    {
+        // input is smaller than max, readFields will not deserialize edges
+        BigDataByteArrayEdges<LongWritable, LongWritable> edges =
+                createBigDataByteArrayEdges(40);
+        edges.initialize(40);
+        List<Edge<LongWritable, LongWritable>> inputEdges = createEdges(edge(1, 2), edge(3, 4));
+        ExtendedDataInput dataInput = createDataInput(inputEdges);
+        edges.readFields(dataInput);
+        assertEdges(inputEdges, edges);
+        ExtendedDataOutput output = createConfiguration().createExtendedDataOutput();
+        edges.write(output);
+        // input is larger than max, readFields will deserialize edges and split them across multiple buffers
+        edges = createBigDataByteArrayEdges(16);
+        edges.initialize(16);
+        edges.readFields(createDataInput(output));
+        assertEdges(inputEdges, edges);
+        //input is larger than max, readFields will deserialize edges and split them across multiple buffers.
+        //make sure fluff at the end is ignored
+        dataInput = createDataInput(inputEdges);
+        edges.readFields(dataInput);
+        assertEdges(inputEdges, edges);
+    }
+    @Test
+    public void testReadWriteBigDataInputOutputBuffers() throws IOException
+    {
+        // input is smaller than max, readFields will not deserialize edges
+        BigDataByteArrayEdges<LongWritable, LongWritable> edges =
+                createBigDataByteArrayEdges(40);
+        edges.initialize(40);
+        List<Edge<LongWritable, LongWritable>> inputEdges = createEdges(edge(1, 2), edge(3, 4));
+        DataInput dataInput = createDataInput(inputEdges);
+        edges.readFields(dataInput);
+        assertEdges(inputEdges, edges);
+        ExtendedDataOutput output = createConfiguration().createExtendedDataOutput();
+        edges.write(output);
+        // input is larger than max, readFields will deserialize edges and split them across multiple buffers
+        edges = createBigDataByteArrayEdges(16);
+        edges.initialize(16);
+        edges.readFields(createDataInput(output));
+        assertEdges(inputEdges, edges);
+    }
+    //endregion