blob: 15d2bb0aad6f17bca90631dc7b95ea5535dc95f0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.graph;
import com.google.common.collect.Lists;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.ArrayListEdges;
import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.edge.HashMapEdges;
import org.apache.giraph.edge.HashMultimapEdges;
import org.apache.giraph.edge.LongDoubleArrayEdges;
import org.apache.giraph.edge.LongDoubleHashMapEdges;
import org.apache.giraph.edge.MutableEdge;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
import org.apache.giraph.time.Times;
import org.apache.giraph.utils.DynamicChannelBufferInputStream;
import org.apache.giraph.utils.DynamicChannelBufferOutputStream;
import org.apache.giraph.utils.EdgeIterables;
import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.junit.Before;
import org.junit.Test;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* Test {@link Vertex} functionality across the provided {@link org.apache.giraph.edge.OutEdges}
* classes.
*/
public class TestVertexAndEdges {
/** Number of repetitions. */
public static final int REPS = 100;
/** {@link org.apache.giraph.edge.OutEdges} classes to be tested. */
private Collection<Class<? extends OutEdges>> edgesClasses =
Lists.newArrayList();
/**
* Dummy concrete vertex.
*/
public static class TestVertex extends Vertex<LongWritable, FloatWritable,
DoubleWritable, LongWritable> {
@Override
public void compute(Iterable<LongWritable> messages) { }
}
/**
* A basic {@link org.apache.giraph.edge.OutEdges} implementation that doesn't provide any
* special functionality. Used to test the default implementations of
* Vertex#getEdgeValue(), Vertex#getMutableEdges(), etc.
*/
public static class TestOutEdges
implements OutEdges<LongWritable, DoubleWritable> {
private List<Edge<LongWritable, DoubleWritable>> edgeList;
@Override
public void initialize(Iterable<Edge<LongWritable, DoubleWritable>> edges) {
this.edgeList = Lists.newArrayList(edges);
}
@Override
public void initialize(int capacity) {
this.edgeList = Lists.newArrayListWithCapacity(capacity);
}
@Override
public void initialize() {
this.edgeList = Lists.newArrayList();
}
@Override
public void add(Edge<LongWritable, DoubleWritable> edge) {
edgeList.add(edge);
}
@Override
public void remove(LongWritable targetVertexId) {
for (Iterator<Edge<LongWritable, DoubleWritable>> edges =
edgeList.iterator(); edges.hasNext();) {
Edge<LongWritable, DoubleWritable> edge = edges.next();
if (edge.getTargetVertexId().equals(targetVertexId)) {
edges.remove();
}
}
}
@Override
public int size() {
return edgeList.size();
}
@Override
public Iterator<Edge<LongWritable, DoubleWritable>> iterator() {
return edgeList.iterator();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(edgeList.size());
for (Edge<LongWritable, DoubleWritable> edge : edgeList) {
edge.getTargetVertexId().write(out);
edge.getValue().write(out);
}
}
@Override
public void readFields(DataInput in) throws IOException {
int numEdges = in.readInt();
initialize(numEdges);
for (int i = 0; i < numEdges; ++i) {
Edge<LongWritable, DoubleWritable> edge = EdgeFactory.createReusable(
new LongWritable(), new DoubleWritable());
WritableUtils.readEdge(in, edge);
edgeList.add(edge);
}
}
}
@Before
public void setUp() {
edgesClasses.add(TestOutEdges.class);
edgesClasses.add(ByteArrayEdges.class);
edgesClasses.add(ArrayListEdges.class);
edgesClasses.add(HashMapEdges.class);
edgesClasses.add(HashMultimapEdges.class);
edgesClasses.add(LongDoubleArrayEdges.class);
edgesClasses.add(LongDoubleHashMapEdges.class);
}
private Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable>
instantiateVertex(Class<? extends OutEdges> edgesClass) {
GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
giraphConfiguration.setVertexClass(TestVertex.class);
giraphConfiguration.setOutEdgesClass(edgesClass);
ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration =
new ImmutableClassesGiraphConfiguration(giraphConfiguration);
return immutableClassesGiraphConfiguration.createVertex();
}
/**
* Test vertex instantiation, initialization, and updating the vertex value.
*/
@Test
public void testVertexIdAndValue() {
Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable> vertex =
instantiateVertex(ArrayListEdges.class);
assertNotNull(vertex);
vertex.initialize(new LongWritable(7), new FloatWritable(3.0f));
assertEquals(7, vertex.getId().get());
assertEquals(3.0f, vertex.getValue().get(), 0d);
vertex.setValue(new FloatWritable(5.5f));
assertEquals(5.5f, vertex.getValue().get(), 0d);
}
public static OutEdges
instantiateOutEdges(Class<? extends OutEdges> edgesClass) {
GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
// Needed to extract type arguments in ReflectionUtils.
giraphConfiguration.setVertexClass(TestVertex.class);
giraphConfiguration.setOutEdgesClass(edgesClass);
ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration =
new ImmutableClassesGiraphConfiguration(giraphConfiguration);
return immutableClassesGiraphConfiguration.createOutEdges();
}
/**
* Test the provided {@link org.apache.giraph.edge.OutEdges} implementations for instantiation,
* initialization, edge addition, and edge removal.
*/
@Test
public void testEdges() {
for (Class<? extends OutEdges> edgesClass : edgesClasses) {
testEdgesClass(edgesClass);
}
}
private void testEdgesClass(Class<? extends OutEdges> edgesClass) {
Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable> vertex =
instantiateVertex(edgesClass);
OutEdges<LongWritable, DoubleWritable> outEdges =
instantiateOutEdges(edgesClass);
assertNotNull(outEdges);
List<Edge<LongWritable, DoubleWritable>> edges = Lists.newLinkedList();
for (int i = 1000; i > 0; --i) {
edges.add(EdgeFactory.create(new LongWritable(i),
new DoubleWritable(i * 2.0)));
}
outEdges.initialize(edges);
vertex.initialize(new LongWritable(1), new FloatWritable(1), outEdges);
assertEquals(20.0, vertex.getEdgeValue(new LongWritable(10)).get(), 0.0);
assertEquals(1000, vertex.getNumEdges());
for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
assertEquals(edge.getTargetVertexId().get() * 2.0d,
edge.getValue().get(), 0d);
}
vertex.removeEdges(new LongWritable(500));
assertEquals(999, vertex.getNumEdges());
for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
assert(edge.getTargetVertexId().get() != 500);
}
vertex.setEdgeValue(new LongWritable(10), new DoubleWritable(33.0));
assertEquals(33.0, vertex.getEdgeValue(new LongWritable(10)).get(), 0);
}
/**
* Test in-place edge mutations via the iterable returned by {@link
* Vertex#getMutableEdges()}.
*/
@Test
public void testMutateEdges() {
for (Class<? extends OutEdges> edgesClass : edgesClasses) {
testMutateEdgesClass(edgesClass);
}
}
private void testMutateEdgesClass(Class<? extends OutEdges> edgesClass) {
Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable> vertex =
instantiateVertex(edgesClass);
OutEdges<LongWritable, DoubleWritable> outEdges =
instantiateOutEdges(edgesClass);
outEdges.initialize();
vertex.initialize(new LongWritable(0), new FloatWritable(0), outEdges);
// Add 10 edges with id i, value i for i = 0..9
for (int i = 0; i < 10; ++i) {
vertex.addEdge(EdgeFactory.create(
new LongWritable(i), new DoubleWritable(i)));
}
// Use the mutable iterable to multiply each edge value by 2
for (MutableEdge<LongWritable, DoubleWritable> edge :
vertex.getMutableEdges()) {
edge.setValue(new DoubleWritable(edge.getValue().get() * 2));
}
// We should still have 10 edges
assertEquals(10, vertex.getNumEdges());
// The edge values should now be double the ids
for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
long id = edge.getTargetVertexId().get();
double value = edge.getValue().get();
assertEquals(id * 2, value, 0);
}
// Use the mutable iterator to remove edges with even id
Iterator<MutableEdge<LongWritable, DoubleWritable>> edgeIt =
vertex.getMutableEdges().iterator();
while (edgeIt.hasNext()) {
if (edgeIt.next().getTargetVertexId().get() % 2 == 0) {
edgeIt.remove();
}
}
// We should now have 5 edges
assertEquals(5, vertex.getNumEdges());
// The edge ids should be all odd
for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
assertEquals(1, edge.getTargetVertexId().get() % 2);
}
// Breaking iteration early should not make us lose edges.
// This version uses repeated calls to next():
Iterator<MutableEdge<LongWritable, DoubleWritable>> it =
vertex.getMutableEdges().iterator();
it.next();
it.next();
assertEquals(5, vertex.getNumEdges());
// This version uses a for-each loop, and the break statement:
int i = 2;
for (MutableEdge<LongWritable, DoubleWritable> edge :
vertex.getMutableEdges()) {
System.out.println(edge.toString());
if (i-- == 0) {
break;
}
}
assertEquals(5, vertex.getNumEdges());
// This version uses a normal, immutable iterable:
i = 2;
for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
if (i-- == 0) {
break;
}
}
assertEquals(5, vertex.getNumEdges());
}
/**
* Test {@link Vertex} and {@link org.apache.giraph.edge.OutEdges} serialization.
* @throws IOException
*/
@Test
public void testSerialize() throws IOException {
for (Class<? extends OutEdges> edgesClass : edgesClasses) {
testSerializeOutEdgesClass(edgesClass);
testDynamicChannelBufferSerializeOutEdgesClass(edgesClass);
testUnsafeSerializeOutEdgesClass(edgesClass);
}
}
private Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable>
buildVertex(Class<? extends OutEdges> edgesClass) {
Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable> vertex =
instantiateVertex(edgesClass);
OutEdges<LongWritable, DoubleWritable> outEdges =
instantiateOutEdges(edgesClass);
int edgesCount = 200;
List<Edge<LongWritable, DoubleWritable>> edges =
Lists.newArrayListWithCapacity(edgesCount);
for (int i = edgesCount; i > 0; --i) {
edges.add(EdgeFactory.create(new LongWritable(i),
new DoubleWritable(i * 2.0)));
}
outEdges.initialize(edges);
vertex.initialize(new LongWritable(2), new FloatWritable(3.0f),
outEdges);
return vertex;
}
private void testSerializeOutEdgesClass(
Class<? extends OutEdges> edgesClass) {
Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable> vertex =
buildVertex(edgesClass);
long serializeNanosStart;
long serializeNanos = 0;
byte[] byteArray = null;
for (int i = 0; i < REPS; ++i) {
serializeNanosStart = SystemTime.get().getNanoseconds();
byteArray = WritableUtils.writeVertexToByteArray(
vertex, false, vertex.getConf());
serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
serializeNanosStart);
}
serializeNanos /= REPS;
System.out.println("testSerialize: Serializing took " +
serializeNanos + " ns for " + byteArray.length + " bytes " +
(byteArray.length * 1f * Time.NS_PER_SECOND / serializeNanos) +
" bytes / sec for " + edgesClass.getName());
Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable>
readVertex = buildVertex(edgesClass);
long deserializeNanosStart;
long deserializeNanos = 0;
for (int i = 0; i < REPS; ++i) {
deserializeNanosStart = SystemTime.get().getNanoseconds();
WritableUtils.reinitializeVertexFromByteArray(byteArray, readVertex, false,
readVertex.getConf());
deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
deserializeNanosStart);
}
deserializeNanos /= REPS;
System.out.println("testSerialize: Deserializing took " +
deserializeNanos + " ns for " + byteArray.length + " bytes " +
(byteArray.length * 1f * Time.NS_PER_SECOND / deserializeNanos) +
" bytes / sec for " + edgesClass.getName());
assertEquals(vertex.getId(), readVertex.getId());
assertEquals(vertex.getValue(), readVertex.getValue());
assertTrue(EdgeIterables.sameEdges(vertex.getEdges(), readVertex.getEdges()));
}
private void testDynamicChannelBufferSerializeOutEdgesClass(
Class<? extends OutEdges> edgesClass)
throws IOException {
Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable> vertex =
buildVertex(edgesClass);
long serializeNanosStart;
long serializeNanos = 0;
DynamicChannelBufferOutputStream outputStream = null;
for (int i = 0; i <
REPS; ++i) {
serializeNanosStart = SystemTime.get().getNanoseconds();
outputStream =
new DynamicChannelBufferOutputStream(32);
WritableUtils.writeVertexToDataOutput(outputStream, vertex, vertex.getConf());
serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
serializeNanosStart);
}
serializeNanos /= REPS;
System.out.println("testDynamicChannelBufferSerializeOutEdgesClass: " +
"Serializing took " + serializeNanos + " ns for " +
outputStream.getDynamicChannelBuffer().writerIndex() + " bytes " +
(outputStream.getDynamicChannelBuffer().writerIndex() * 1f *
Time.NS_PER_SECOND / serializeNanos) +
" bytes / sec for " + edgesClass.getName());
Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable>
readVertex = buildVertex(edgesClass);
long deserializeNanosStart;
long deserializeNanos = 0;
for (int i = 0; i < REPS; ++i) {
deserializeNanosStart = SystemTime.get().getNanoseconds();
DynamicChannelBufferInputStream inputStream = new
DynamicChannelBufferInputStream(
outputStream.getDynamicChannelBuffer());
WritableUtils.reinitializeVertexFromDataInput(
inputStream, readVertex, readVertex.getConf());
deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
deserializeNanosStart);
outputStream.getDynamicChannelBuffer().readerIndex(0);
}
deserializeNanos /= REPS;
System.out.println("testDynamicChannelBufferSerializeOutEdgesClass: " +
"Deserializing took " + deserializeNanos + " ns for " +
outputStream.getDynamicChannelBuffer().writerIndex() + " bytes " +
(outputStream.getDynamicChannelBuffer().writerIndex() * 1f *
Time.NS_PER_SECOND / deserializeNanos) +
" bytes / sec for " + edgesClass.getName());
assertEquals(vertex.getId(), readVertex.getId());
assertEquals(vertex.getValue(), readVertex.getValue());
assertTrue(EdgeIterables.sameEdges(vertex.getEdges(), readVertex.getEdges()));
}
private void testUnsafeSerializeOutEdgesClass(
Class<? extends OutEdges> edgesClass)
throws IOException {
Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable> vertex =
buildVertex(edgesClass);
long serializeNanosStart;
long serializeNanos = 0;
UnsafeByteArrayOutputStream outputStream = null;
for (int i = 0; i <
REPS; ++i) {
serializeNanosStart = SystemTime.get().getNanoseconds();
outputStream =
new UnsafeByteArrayOutputStream(32);
WritableUtils.writeVertexToDataOutput(outputStream, vertex, vertex.getConf());
serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
serializeNanosStart);
}
serializeNanos /= REPS;
System.out.println("testUnsafeSerializeOutEdgesClass: " +
"Serializing took " +
serializeNanos +
" ns for " + outputStream.getPos()
+ " bytes " +
(outputStream.getPos() * 1f *
Time.NS_PER_SECOND / serializeNanos) +
" bytes / sec for " + edgesClass.getName());
Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable>
readVertex = buildVertex(edgesClass);
long deserializeNanosStart;
long deserializeNanos = 0;
for (int i = 0; i < REPS; ++i) {
deserializeNanosStart = SystemTime.get().getNanoseconds();
UnsafeByteArrayInputStream inputStream = new
UnsafeByteArrayInputStream(
outputStream.getByteArray(), 0, outputStream.getPos());
WritableUtils.reinitializeVertexFromDataInput(
inputStream, readVertex, readVertex.getConf());
deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
deserializeNanosStart);
}
deserializeNanos /= REPS;
System.out.println("testUnsafeSerializeOutEdgesClass: " +
"Deserializing took " +
deserializeNanos +
" ns for " + outputStream.getPos() +
" bytes " +
(outputStream.getPos() * 1f *
Time.NS_PER_SECOND / deserializeNanos) +
" bytes / sec for " + edgesClass.getName());
assertEquals(vertex.getId(), readVertex.getId());
assertEquals(vertex.getValue(), readVertex.getValue());
assertTrue(EdgeIterables.sameEdges(vertex.getEdges(), readVertex.getEdges()));
}
}