| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.giraph.edge; |
| |
| import com.google.common.collect.Iterators; |
| import com.google.common.collect.UnmodifiableIterator; |
| import org.apache.giraph.utils.ExtendedDataInput; |
| import org.apache.giraph.utils.ExtendedDataOutput; |
| import org.apache.giraph.utils.Trimmable; |
| import org.apache.giraph.utils.WritableUtils; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.io.WritableComparable; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| |
| /** |
| * {@link OutEdges} implementation backed by a byte array. |
| * Parallel edges are allowed. |
| * Note: this implementation is optimized for space usage, |
| * but edge removals are expensive. |
| * |
| * @param <I> Vertex id |
| * @param <E> Edge value |
| */ |
| public class ByteArrayEdges<I extends WritableComparable, E extends Writable> |
| extends ConfigurableOutEdges<I, E> |
| implements ReuseObjectsOutEdges<I, E>, Trimmable { |
| /** Serialized edges. */ |
| private byte[] serializedEdges; |
| /** Number of bytes used in serializedEdges. */ |
| private int serializedEdgesBytesUsed; |
| /** Number of edges. */ |
| private int edgeCount; |
| |
| @Override |
| public void initialize(Iterable<Edge<I, E>> edges) { |
| ExtendedDataOutput extendedOutputStream = |
| getConf().createExtendedDataOutput(); |
| for (Edge<I, E> edge : edges) { |
| try { |
| WritableUtils.writeEdge(extendedOutputStream, edge); |
| } catch (IOException e) { |
| throw new IllegalStateException("initialize: Failed to serialize " + |
| edge); |
| } |
| ++edgeCount; |
| } |
| serializedEdges = extendedOutputStream.getByteArray(); |
| serializedEdgesBytesUsed = extendedOutputStream.getPos(); |
| } |
| |
| @Override |
| public void initialize(int capacity) { |
| // We have no way to know the size in bytes used by a certain |
| // number of edges. |
| initialize(); |
| } |
| |
| @Override |
| public void initialize() { |
| // No-op: no need to initialize the byte-array if there are no edges, |
| // since add() and iterator() work fine with a null buffer. |
| } |
| |
| @Override |
| public void add(Edge<I, E> edge) { |
| ExtendedDataOutput extendedDataOutput = |
| getConf().createExtendedDataOutput( |
| serializedEdges, serializedEdgesBytesUsed); |
| try { |
| WritableUtils.writeEdge(extendedDataOutput, edge); |
| } catch (IOException e) { |
| throw new IllegalStateException("add: Failed to write to the new " + |
| "byte array"); |
| } |
| serializedEdges = extendedDataOutput.getByteArray(); |
| serializedEdgesBytesUsed = extendedDataOutput.getPos(); |
| ++edgeCount; |
| } |
| |
| @Override |
| public void remove(I targetVertexId) { |
| // Note that this is very expensive (deserializes all edges). |
| ByteArrayEdgeIterator iterator = new ByteArrayEdgeIterator(); |
| List<Integer> foundStartOffsets = new LinkedList<Integer>(); |
| List<Integer> foundEndOffsets = new LinkedList<Integer>(); |
| int lastStartOffset = 0; |
| while (iterator.hasNext()) { |
| Edge<I, E> edge = iterator.next(); |
| if (edge.getTargetVertexId().equals(targetVertexId)) { |
| foundStartOffsets.add(lastStartOffset); |
| foundEndOffsets.add(iterator.extendedDataInput.getPos()); |
| --edgeCount; |
| } |
| lastStartOffset = iterator.extendedDataInput.getPos(); |
| } |
| foundStartOffsets.add(serializedEdgesBytesUsed); |
| |
| Iterator<Integer> foundStartOffsetIter = foundStartOffsets.iterator(); |
| Integer foundStartOffset = foundStartOffsetIter.next(); |
| for (Integer foundEndOffset : foundEndOffsets) { |
| Integer nextFoundStartOffset = foundStartOffsetIter.next(); |
| System.arraycopy(serializedEdges, foundEndOffset, |
| serializedEdges, foundStartOffset, |
| nextFoundStartOffset - foundEndOffset); |
| serializedEdgesBytesUsed -= foundEndOffset - foundStartOffset; |
| foundStartOffset = nextFoundStartOffset; |
| } |
| } |
| |
| @Override |
| public int size() { |
| return edgeCount; |
| } |
| |
| @Override |
| public void trim() { |
| if (serializedEdges != null && |
| serializedEdges.length > serializedEdgesBytesUsed) { |
| serializedEdges = |
| Arrays.copyOf(serializedEdges, serializedEdgesBytesUsed); |
| } |
| } |
| |
| /** |
| * Iterator that reuses the same Edge object. |
| */ |
| private class ByteArrayEdgeIterator |
| extends UnmodifiableIterator<Edge<I, E>> { |
| /** Input for processing the bytes */ |
| private ExtendedDataInput extendedDataInput = |
| getConf().createExtendedDataInput( |
| serializedEdges, 0, serializedEdgesBytesUsed); |
| /** Representative edge object. */ |
| private ReusableEdge<I, E> representativeEdge = |
| getConf().createReusableEdge(); |
| |
| @Override |
| public boolean hasNext() { |
| return serializedEdges != null && !extendedDataInput.endOfInput(); |
| } |
| |
| @Override |
| public Edge<I, E> next() { |
| try { |
| WritableUtils.readEdge(extendedDataInput, representativeEdge); |
| } catch (IOException e) { |
| throw new IllegalStateException("next: Failed on pos " + |
| extendedDataInput.getPos() + " edge " + representativeEdge); |
| } |
| return representativeEdge; |
| } |
| } |
| |
| @Override |
| public Iterator<Edge<I, E>> iterator() { |
| if (edgeCount == 0) { |
| return Iterators.emptyIterator(); |
| } else { |
| return new ByteArrayEdgeIterator(); |
| } |
| } |
| |
| @Override |
| public void readFields(DataInput in) throws IOException { |
| serializedEdgesBytesUsed = in.readInt(); |
| if (serializedEdgesBytesUsed > 0) { |
| // Only create a new buffer if the old one isn't big enough |
| if (serializedEdges == null || |
| serializedEdgesBytesUsed > serializedEdges.length) { |
| serializedEdges = new byte[serializedEdgesBytesUsed]; |
| } |
| in.readFully(serializedEdges, 0, serializedEdgesBytesUsed); |
| } |
| edgeCount = in.readInt(); |
| } |
| |
| @Override |
| public void write(DataOutput out) throws IOException { |
| out.writeInt(serializedEdgesBytesUsed); |
| if (serializedEdgesBytesUsed > 0) { |
| out.write(serializedEdges, 0, serializedEdgesBytesUsed); |
| } |
| out.writeInt(edgeCount); |
| } |
| } |