blob: 1f5ef920f928993287b622ce1fd2ddd6d3ab3501 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.block_app.framework;
import org.apache.giraph.block_app.framework.block.Block;
import org.apache.giraph.block_app.library.Pieces;
import org.apache.giraph.block_app.library.VertexSuppliers;
import org.apache.giraph.block_app.test_setup.NumericTestGraph;
import org.apache.giraph.block_app.test_setup.TestGraphChecker;
import org.apache.giraph.block_app.test_setup.TestGraphModifier;
import org.apache.giraph.block_app.test_setup.TestGraphUtils;
import org.apache.giraph.conf.BulkConfigurator;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.function.vertex.ConsumerWithVertex;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.junit.Assert;
import org.junit.Test;
public class SendingMessagesTest {
@Test
public void createVertexOnMsgsTest() throws Exception {
TestGraphUtils.runTest(
new TestGraphModifier<LongWritable, LongWritable, Writable>() {
@Override
public void modifyGraph(NumericTestGraph<LongWritable, LongWritable, Writable> graph) {
graph.addEdge(1, 2);
}
},
new TestGraphChecker<LongWritable, LongWritable, Writable>() {
@Override
public void checkOutput(NumericTestGraph<LongWritable, LongWritable, Writable> graph) {
Assert.assertEquals(1, graph.getValue(2).get());
Assert.assertEquals(0, graph.getValue(1).get());
}
},
new BulkConfigurator() {
@Override
public void configure(GiraphConfiguration conf) {
BlockUtils.setBlockFactoryClass(conf, SendingMessagesToNeighborsBlockFactory.class);
}
});
}
@Test
public void doNotCreateVertexOnMsgsTest() throws Exception {
TestGraphUtils.runTest(
new TestGraphModifier<LongWritable, LongWritable, Writable>() {
@Override
public void modifyGraph(NumericTestGraph<LongWritable, LongWritable, Writable> graph) {
graph.addEdge(1, 2);
}
},
new TestGraphChecker<LongWritable, LongWritable, Writable>() {
@Override
public void checkOutput(NumericTestGraph<LongWritable, LongWritable, Writable> graph) {
Assert.assertNull(graph.getVertex(2));
Assert.assertEquals(0, graph.getValue(1).get());
}
},
new BulkConfigurator() {
@Override
public void configure(GiraphConfiguration conf) {
BlockUtils.setBlockFactoryClass(conf, SendingMessagesToNeighborsBlockFactory.class);
GiraphConstants.RESOLVER_CREATE_VERTEX_ON_MSGS.set(conf, false);
}
});
}
@Test
public void createMultiMsgs() throws Exception {
TestGraphUtils.runTest(
new TestGraphModifier<LongWritable, LongWritable, Writable>() {
@Override
public void modifyGraph(NumericTestGraph<LongWritable, LongWritable, Writable> graph) {
graph.addSymmetricEdge(1, 2);
graph.addSymmetricEdge(3, 2);
}
},
new TestGraphChecker<LongWritable, LongWritable, Writable>() {
@Override
public void checkOutput(NumericTestGraph<LongWritable, LongWritable, Writable> graph) {
Assert.assertEquals(3, graph.getValue(2).get());
Assert.assertEquals(2, graph.getValue(1).get());
Assert.assertEquals(2, graph.getValue(3).get());
}
},
new BulkConfigurator() {
@Override
public void configure(GiraphConfiguration conf) {
BlockUtils.setBlockFactoryClass(conf, SendingMessagesToNeighborsBlockFactory.class);
}
});
}
public static class SendingMessagesToNeighborsBlockFactory extends TestLongNullNullBlockFactory {
@Override
protected Class<? extends Writable> getVertexValueClass(GiraphConfiguration conf) {
return LongWritable.class;
}
@Override
public Block createBlock(GiraphConfiguration conf) {
return Pieces.sendMessageToNeighbors(
"SendToNeighbors",
LongWritable.class,
VertexSuppliers.<LongWritable, LongWritable, Writable>vertexIdSupplier(),
new ConsumerWithVertex<LongWritable, LongWritable, Writable, Iterable<LongWritable>>() {
@Override
public void apply(Vertex<LongWritable, LongWritable, Writable> vertex,
Iterable<LongWritable> messages) {
long max = 0;
for (LongWritable v : messages) {
max = Math.max(max, v.get());
}
vertex.getValue().set(max);
}
});
}
}
}