blob: c8641ccc5d5cd692a1e529612f7d48bdfc583450 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.lib;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import junit.framework.TestCase;
import org.apache.giraph.graph.BasicVertex;
import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestTextDoubleDoubleAdjacencyListVertexInputFormat extends TestCase {
private RecordReader<LongWritable, Text> rr;
private Configuration conf;
private TaskAttemptContext tac;
private GraphState<Text, DoubleWritable, DoubleWritable, BooleanWritable> graphState;
public void setUp() throws IOException, InterruptedException {
rr = mock(RecordReader.class);
when(rr.nextKeyValue()).thenReturn(true).thenReturn(false);
conf = new Configuration();
conf.setClass(GiraphJob.VERTEX_CLASS, DummyVertex.class, BasicVertex.class);
conf.setClass(GiraphJob.VERTEX_INDEX_CLASS, Text.class, Writable.class);
conf.setClass(GiraphJob.VERTEX_VALUE_CLASS, DoubleWritable.class, Writable.class);
graphState = mock(GraphState.class);
tac = mock(TaskAttemptContext.class);
when(tac.getConfiguration()).thenReturn(conf);
}
public void testIndexMustHaveValue() throws IOException, InterruptedException {
String input = "hi";
when(rr.getCurrentValue()).thenReturn(new Text(input));
TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
vr.initialize(null, tac);
try {
vr.nextVertex();
vr.getCurrentVertex();
fail("Should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException iae) {
assertTrue(iae.getMessage().startsWith("Line did not split correctly: "));
}
}
public void testEdgesMustHaveValues() throws IOException, InterruptedException {
String input = "index\t55.66\tindex2";
when(rr.getCurrentValue()).thenReturn(new Text(input));
TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
vr.initialize(null, tac);
try {
vr.nextVertex();
vr.getCurrentVertex();
fail("Should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException iae) {
assertTrue(iae.getMessage().startsWith("Line did not split correctly: "));
}
}
public static void setGraphState(BasicVertex vertex, GraphState graphState) throws Exception {
Class<? extends BasicVertex> c = BasicVertex.class;
Method m = c.getDeclaredMethod("setGraphState", GraphState.class);
m.setAccessible(true);
m.invoke(vertex, graphState);
}
public static <I extends WritableComparable, V extends Writable,
E extends Writable, M extends Writable> void assertValidVertex(Configuration conf,
GraphState<I, V, E, M> graphState, BasicVertex<I, V, E, M> actual,
I expectedId, V expectedValue, Edge<I, E>... edges)
throws Exception {
BasicVertex<I, V, E, M> expected = BspUtils.createVertex(conf);
setGraphState(expected, graphState);
// FIXME! maybe can't work if not instantiated properly
Map<I, E> edgeMap = Maps.newHashMap();
for(Edge<I, E> edge : edges) {
edgeMap.put(edge.getDestVertexId(), edge.getEdgeValue());
}
expected.initialize(expectedId, expectedValue, edgeMap, null);
assertValid(expected, actual);
}
public static
<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> void
assertValid(BasicVertex<I, V, E, M> expected, BasicVertex<I, V, E, M> actual) {
assertEquals(expected.getVertexId(), actual.getVertexId());
assertEquals(expected.getVertexValue(), actual.getVertexValue());
assertEquals(expected.getNumEdges(), actual.getNumEdges());
List<Edge<I, E>> expectedEdges = Lists.newArrayList();
List<Edge<I, E>> actualEdges = Lists.newArrayList();
for(I actualDestId : actual) {
actualEdges.add(new Edge<I, E>(actualDestId, actual.getEdgeValue(actualDestId)));
}
for(I expectedDestId : expected) {
expectedEdges.add(new Edge<I, E>(expectedDestId, expected.getEdgeValue(expectedDestId)));
}
Collections.sort(expectedEdges);
Collections.sort(actualEdges);
for(int i = 0; i < expectedEdges.size(); i++) {
assertEquals(expectedEdges.get(i), actualEdges.get(i));
}
}
public void testHappyPath() throws Exception {
String input = "Hi\t0\tCiao\t1.123\tBomdia\t2.234\tOla\t3.345";
when(rr.getCurrentValue()).thenReturn(new Text(input));
TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
vr.initialize(null, tac);
assertTrue("Should have been able to add a vertex", vr.nextVertex());
BasicVertex<Text, DoubleWritable, DoubleWritable, BooleanWritable> vertex =
vr.getCurrentVertex();
setGraphState(vertex, graphState);
assertValidVertex(conf, graphState, vertex, new Text("Hi"), new DoubleWritable(0),
new Edge<Text, DoubleWritable>(new Text("Ciao"), new DoubleWritable(1.123d)),
new Edge<Text, DoubleWritable>(new Text("Bomdia"), new DoubleWritable(2.234d)),
new Edge<Text, DoubleWritable>(new Text("Ola"), new DoubleWritable(3.345d)));
assertEquals(vertex.getNumOutEdges(), 3);
}
public void testLineSanitizer() throws Exception {
String input = "Bye\t0.01\tCiao\t1.001\tTchau\t2.0001\tAdios\t3.00001";
AdjacencyListVertexReader.LineSanitizer toUpper =
new AdjacencyListVertexReader.LineSanitizer() {
@Override
public String sanitize(String s) {
return s.toUpperCase();
}
};
when(rr.getCurrentValue()).thenReturn(new Text(input));
TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr, toUpper);
vr.initialize(null, tac);
assertTrue("Should have been able to read vertex", vr.nextVertex());
BasicVertex<Text, DoubleWritable, DoubleWritable, BooleanWritable> vertex =
vr.getCurrentVertex();
setGraphState(vertex, graphState);
assertValidVertex(conf, graphState, vertex,
new Text("BYE"), new DoubleWritable(0.01d),
new Edge<Text, DoubleWritable>(new Text("CIAO"), new DoubleWritable(1.001d)),
new Edge<Text, DoubleWritable>(new Text("TCHAU"), new DoubleWritable(2.0001d)),
new Edge<Text, DoubleWritable>(new Text("ADIOS"), new DoubleWritable(3.00001d)));
assertEquals(vertex.getNumOutEdges(), 3);
}
public void testDifferentSeparators() throws Exception {
String input = "alpha:42:beta:99";
when(rr.getCurrentValue()).thenReturn(new Text(input));
conf.set(AdjacencyListVertexReader.LINE_TOKENIZE_VALUE, ":");
TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
vr.initialize(null, tac);
assertTrue("Should have been able to read vertex", vr.nextVertex());
BasicVertex<Text, DoubleWritable, DoubleWritable, BooleanWritable> vertex =
vr.getCurrentVertex();
setGraphState(vertex, graphState);
assertValidVertex(conf, graphState, vertex, new Text("alpha"), new DoubleWritable(42d),
new Edge<Text, DoubleWritable>(new Text("beta"), new DoubleWritable(99d)));
assertEquals(vertex.getNumOutEdges(), 1);
}
public static class DummyVertex
extends EdgeListVertex<Text, DoubleWritable,
DoubleWritable, BooleanWritable> {
@Override
public void compute(Iterator<BooleanWritable> msgIterator) throws IOException {
// ignore
}
}
}